Skip to content
Snippets Groups Projects
MetadataDatabaseFactory.py 4.98 KiB
Newer Older
  • Learn to ignore specific revisions
  • from datetime import datetime
    
    from obspy import UTCDateTime
    
    from sqlalchemy import or_
    
    from ...metadata import Metadata, MetadataQuery
    
    from .metadata_history_table import metadata_history
    from .metadata_table import metadata
    
    
    class MetadataDatabaseFactory(object):
    
        def __init__(self, database: Database):
            self.database = database
    
        async def create_metadata(self, meta: Metadata) -> Metadata:
            query = metadata.insert()
    
            meta.status = meta.status or "new"
    
            values = meta.datetime_dict(exclude={"id", "metadata_id"}, exclude_none=True)
    
            query = query.values(**values)
    
            meta.id = await self.database.execute(query)
    
    
        async def get_metadata(
            self,
    
            params: MetadataQuery,
            history: bool = False,
    
        ) -> List[Metadata]:
    
            table = metadata
            if history:
                table = metadata_history
            query = table.select()
            (
                id,
                category,
                starttime,
                endtime,
                created_after,
                created_before,
    
                network,
                station,
                channel,
                location,
    
                data_valid,
                metadata_valid,
                status,
    
            ) = params.datetime_dict().values()
    
                query = query.where(table.c.id == id)
    
            if category:
    
                query = query.where(table.c.category == category)
    
            if network:
    
                query = query.where(table.c.network == network)
    
            if station:
    
                query = query.where(table.c.station == station)
    
            if channel:
    
                query = query.where(table.c.channel.like(channel))
    
            if location:
    
                query = query.where(table.c.location.like(location))
    
            if starttime:
                query = query.where(
    
                        table.c.endtime == None,
                        table.c.endtime > starttime,
    
                )
            if endtime:
                query = query.where(
    
                        table.c.starttime == None,
                        table.c.starttime < endtime,
    
                )
            if created_after:
    
                query = query.where(table.c.created_time > created_after)
    
            if created_before:
    
                query = query.where(table.c.created_time < created_before)
    
            if data_valid is not None:
    
                query = query.where(table.c.data_valid == data_valid)
    
            if metadata_valid is not None:
    
                query = query.where(table.c.metadata_valid == metadata_valid)
    
                query = query.where(table.c.status.in_(status))
    
            rows = await self.database.fetch_all(query)
    
            return [Metadata(**row) for row in rows]
    
    
        async def get_metadata_by_id(self, id: int):
    
            meta = await self.get_metadata(MetadataQuery(id=id))
    
            if len(meta) != 1:
                raise ValueError(f"{len(meta)} records found")
            return meta[0]
    
    
        async def get_metadata_history_by_id(self, id: int) -> Optional[Metadata]:
            query = metadata_history.select()
            query = query.where(metadata_history.c.id == id)
            meta = await self.database.fetch_one(query)
            if meta is None:
                return meta
            return Metadata(**meta)
    
        async def get_metadata_history_by_metadata_id(
            self, metadata_id: int
        ) -> List[Metadata]:
    
            async with self.database.transaction() as transaction:
                query = metadata_history.select()
                query = query.where(metadata_history.c.metadata_id == metadata_id).order_by(
                    metadata_history.c.updated_time
                )
                rows = await self.database.fetch_all(query)
                metadata = [Metadata(**row) for row in rows]
                current_metadata = await self.get_metadata_by_id(id=metadata_id)
                metadata.append(current_metadata)
                # return records in order of age(newest first)
                metadata.reverse()
                return metadata
    
        async def update_metadata(self, meta: Metadata, updated_by: str) -> Metadata:
            async with self.database.transaction() as transaction:
                # write current record to metadata history table
    
                original_metadata = await self.get_metadata_by_id(id=meta.id)
    
                original_metadata.metadata_id = original_metadata.id
                values = original_metadata.datetime_dict(exclude={"id"}, exclude_none=True)
    
                query = metadata_history.insert()
    
                query = query.values(**values)
                original_metadata.id = await self.database.execute(query)
                # update record in metadata table
                meta.updated_by = updated_by
    
                query = metadata.update().where(metadata.c.id == meta.id)
    
                values = meta.datetime_dict(exclude={"id", "metadata_id"})
                query = query.values(**values)
    
                return await self.get_metadata_by_id(id=meta.id)