Newer
Older
from typing import List, Optional
from databases import Database
from sqlalchemy import or_, Table
from ...metadata import Metadata, MetadataCategory
from .metadata_history_table import metadata_history
from .metadata_table import metadata
class MetadataDatabaseFactory(object):
def __init__(self, database: Database):
self.database = database
async def create_metadata(self, meta: Metadata) -> Metadata:
query = metadata.insert()
meta.status = meta.status or "new"
values = meta.datetime_dict(exclude={"id", "metadata_id"}, exclude_none=True)
meta.id = await self.database.execute(query)
return meta
async def get_metadata(
self,
*, # make all params keyword
id: int = None,
network: str = None,
station: str = None,
channel: str = None,
location: str = None,
category: MetadataCategory = None,
starttime: datetime = None,
endtime: datetime = None,
created_after: datetime = None,
created_before: datetime = None,
data_valid: bool = None,
metadata: Table = metadata,
status: List[str] = None,
query = metadata.select()
query = query.where(metadata.c.id == id)
query = query.where(metadata.c.category == category)
query = query.where(metadata.c.network == network)
query = query.where(metadata.c.station == station)
query = query.where(metadata.c.channel.like(channel))
query = query.where(metadata.c.location.like(location))
or_(
metadata.c.endtime == None,
metadata.c.endtime > starttime,
)
or_(
metadata.c.starttime == None,
metadata.c.starttime < endtime,
)
query = query.where(metadata.c.created_time > created_after)
query = query.where(metadata.c.created_time < created_before)
query = query.where(metadata.c.data_valid == data_valid)
query = query.where(metadata.c.metadata_valid == metadata_valid)
if status is not None:
query = query.where(metadata.c.status.in_(status))
rows = await self.database.fetch_all(query)
return [Metadata(**row) for row in rows]
async def get_metadata_by_id(self, id: int):
meta = await self.get_metadata(id=id)
if len(meta) != 1:
raise ValueError(f"{len(meta)} records found")
return meta[0]
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def get_metadata_history(
self,
*, # make all params keyword
id: int = None,
network: str = None,
station: str = None,
channel: str = None,
location: str = None,
category: MetadataCategory = None,
starttime: datetime = None,
endtime: datetime = None,
created_after: datetime = None,
created_before: datetime = None,
data_valid: bool = None,
metadata_valid: bool = None,
status: List[str] = None,
) -> List[Metadata]:
return await self.get_metadata(
id=id,
network=network,
station=station,
channel=channel,
location=location,
category=category,
starttime=starttime,
endtime=endtime,
created_after=created_after,
created_before=created_before,
data_valid=data_valid,
metadata=metadata_history,
metadata_valid=metadata_valid,
status=status,
)
async def get_metadata_history_by_id(self, id: int) -> Optional[Metadata]:
query = metadata_history.select()
query = query.where(metadata_history.c.id == id)
meta = await self.database.fetch_one(query)
if meta is None:
return meta
return Metadata(**meta)
async def get_metadata_history_by_metadata_id(
self, metadata_id: int
) -> List[Metadata]:
async with self.database.transaction() as transaction:
query = metadata_history.select()
query = query.where(metadata_history.c.metadata_id == metadata_id).order_by(
metadata_history.c.updated_time
)
rows = await self.database.fetch_all(query)
metadata = [Metadata(**row) for row in rows]
current_metadata = await self.get_metadata_by_id(id=metadata_id)
metadata.append(current_metadata)
# return records in order of age(newest first)
metadata.reverse()
return metadata
async def update_metadata(self, meta: Metadata, updated_by: str) -> Metadata:
async with self.database.transaction() as transaction:
# write current record to metadata history table
original_metadata = await self.get_metadata_by_id(id=meta.id)
original_metadata.metadata_id = original_metadata.id
values = original_metadata.datetime_dict(exclude={"id"}, exclude_none=True)
query = metadata_history.insert()
query = query.values(**values)
original_metadata.id = await self.database.execute(query)
# update record in metadata table
meta.updated_by = updated_by
meta.updated_time = UTCDateTime()
query = metadata.update().where(metadata.c.id == meta.id)
values = meta.datetime_dict(exclude={"id", "metadata_id"})
query = query.values(**values)
await self.database.execute(query)
return await self.get_metadata_by_id(id=meta.id)