Newer
Older
from typing import List, Optional
from databases import Database
from sqlalchemy import or_
from ...metadata import Metadata, MetadataQuery
from .metadata_history_table import metadata_history
from .metadata_table import metadata
class MetadataDatabaseFactory(object):
def __init__(self, database: Database):
self.database = database
async def create_metadata(self, meta: Metadata) -> Metadata:
query = metadata.insert()
meta.status = meta.status or "new"
values = meta.datetime_dict(exclude={"id", "metadata_id"}, exclude_none=True)
meta.id = await self.database.execute(query)
return meta
params: MetadataQuery,
history: bool = False,
table = metadata
if history:
table = metadata_history
query = table.select()
(
id,
category,
starttime,
endtime,
created_after,
created_before,
network,
station,
channel,
location,
data_valid,
metadata_valid,
status,
query = query.where(table.c.id == id)
query = query.where(table.c.category == category)
query = query.where(table.c.network == network)
query = query.where(table.c.station == station)
query = query.where(table.c.channel.like(channel))
query = query.where(table.c.location.like(location))
or_(
table.c.endtime == None,
table.c.endtime > starttime,
)
or_(
table.c.starttime == None,
table.c.starttime < endtime,
)
query = query.where(table.c.created_time > created_after)
query = query.where(table.c.created_time < created_before)
query = query.where(table.c.data_valid == data_valid)
query = query.where(table.c.metadata_valid == metadata_valid)
if status is not None:
query = query.where(table.c.status.in_(status))
rows = await self.database.fetch_all(query)
return [Metadata(**row) for row in rows]
async def get_metadata_by_id(self, id: int):
meta = await self.get_metadata(MetadataQuery(id=id))
if len(meta) != 1:
raise ValueError(f"{len(meta)} records found")
return meta[0]
async def get_metadata_history_by_id(self, id: int) -> Optional[Metadata]:
query = metadata_history.select()
query = query.where(metadata_history.c.id == id)
meta = await self.database.fetch_one(query)
if meta is None:
return meta
return Metadata(**meta)
async def get_metadata_history_by_metadata_id(
self, metadata_id: int
) -> List[Metadata]:
async with self.database.transaction() as transaction:
query = metadata_history.select()
query = query.where(metadata_history.c.metadata_id == metadata_id).order_by(
metadata_history.c.updated_time
)
rows = await self.database.fetch_all(query)
metadata = [Metadata(**row) for row in rows]
current_metadata = await self.get_metadata_by_id(id=metadata_id)
metadata.append(current_metadata)
# return records in order of age(newest first)
metadata.reverse()
return metadata
async def update_metadata(self, meta: Metadata, updated_by: str) -> Metadata:
async with self.database.transaction() as transaction:
# write current record to metadata history table
original_metadata = await self.get_metadata_by_id(id=meta.id)
original_metadata.metadata_id = original_metadata.id
values = original_metadata.datetime_dict(exclude={"id"}, exclude_none=True)
query = metadata_history.insert()
query = query.values(**values)
original_metadata.id = await self.database.execute(query)
# update record in metadata table
meta.updated_by = updated_by
meta.updated_time = UTCDateTime()
query = metadata.update().where(metadata.c.id == meta.id)
values = meta.datetime_dict(exclude={"id", "metadata_id"})
query = query.values(**values)
await self.database.execute(query)
return await self.get_metadata_by_id(id=meta.id)