Skip to content
Snippets Groups Projects

get metadata history from query parameters

Merged Cain, Payton David requested to merge ghsc/users/pcain/geomag-algorithms:issue-26 into master
All threads resolved!
Files
2
@@ -5,7 +5,7 @@ from databases import Database
from obspy import UTCDateTime
from sqlalchemy import or_
from ...metadata import Metadata, MetadataCategory
from ...metadata import Metadata, MetadataQuery
from .metadata_history_table import metadata_history
from .metadata_table import metadata
@@ -24,68 +24,84 @@ class MetadataDatabaseFactory(object):
async def get_metadata(
self,
*, # make all params keyword
id: int = None,
network: str = None,
station: str = None,
channel: str = None,
location: str = None,
category: MetadataCategory = None,
starttime: datetime = None,
endtime: datetime = None,
created_after: datetime = None,
created_before: datetime = None,
data_valid: bool = None,
metadata_valid: bool = None,
status: List[str] = None,
):
query = metadata.select()
params: MetadataQuery,
history: bool = False,
) -> List[Metadata]:
table = metadata
if history:
table = metadata_history
query = table.select()
(
id,
category,
network,
station,
channel,
location,
starttime,
endtime,
created_after,
created_before,
data_valid,
metadata_valid,
status,
) = params.dict().values()
if id:
query = query.where(metadata.c.id == id)
query = query.where(table.c.id == id)
if category:
query = query.where(metadata.c.category == category)
query = query.where(table.c.category == category)
if network:
query = query.where(metadata.c.network == network)
query = query.where(table.c.network == network)
if station:
query = query.where(metadata.c.station == station)
query = query.where(table.c.station == station)
if channel:
query = query.where(metadata.c.channel.like(channel))
query = query.where(table.c.channel.like(channel))
if location:
query = query.where(metadata.c.location.like(location))
query = query.where(table.c.location.like(location))
if starttime:
query = query.where(
or_(
metadata.c.endtime == None,
metadata.c.endtime > starttime,
table.c.endtime == None,
table.c.endtime > starttime,
)
)
if endtime:
query = query.where(
or_(
metadata.c.starttime == None,
metadata.c.starttime < endtime,
table.c.starttime == None,
table.c.starttime < endtime,
)
)
if created_after:
query = query.where(metadata.c.created_time > created_after)
query = query.where(table.c.created_time > created_after)
if created_before:
query = query.where(metadata.c.created_time < created_before)
query = query.where(table.c.created_time < created_before)
if data_valid is not None:
query = query.where(metadata.c.data_valid == data_valid)
query = query.where(table.c.data_valid == data_valid)
if metadata_valid is not None:
query = query.where(metadata.c.metadata_valid == metadata_valid)
query = query.where(table.c.metadata_valid == metadata_valid)
if status is not None:
query = query.where(metadata.c.status.in_(status))
query = query.where(table.c.status.in_(status))
rows = await self.database.fetch_all(query)
return [Metadata(**row) for row in rows]
async def get_metadata_by_id(self, id: int):
meta = await self.get_metadata(id=id)
meta = await self.get_metadata(MetadataQuery(id=id))
if len(meta) != 1:
raise ValueError(f"{len(meta)} records found")
return meta[0]
async def get_metadata_history(self, metadata_id: int) -> List[Metadata]:
async def get_metadata_history_by_id(self, id: int) -> Optional[Metadata]:
query = metadata_history.select()
query = query.where(metadata_history.c.id == id)
meta = await self.database.fetch_one(query)
if meta is None:
return meta
return Metadata(**meta)
async def get_metadata_history_by_metadata_id(
self, metadata_id: int
) -> List[Metadata]:
async with self.database.transaction() as transaction:
query = metadata_history.select()
query = query.where(metadata_history.c.metadata_id == metadata_id).order_by(
@@ -99,14 +115,6 @@ class MetadataDatabaseFactory(object):
metadata.reverse()
return metadata
async def get_metadata_history_by_id(self, id: int) -> Optional[Metadata]:
query = metadata_history.select()
query = query.where(metadata_history.c.id == id)
meta = await self.database.fetch_one(query)
if meta is None:
return meta
return Metadata(**meta)
async def update_metadata(self, meta: Metadata, updated_by: str) -> Metadata:
async with self.database.transaction() as transaction:
# write current record to metadata history table
Loading