diff --git a/geomagio/algorithm/MetadataAlgorithm.py b/geomagio/algorithm/MetadataAlgorithm.py new file mode 100644 index 0000000000000000000000000000000000000000..7b85e82ed54338f639237a0b79eb4e6e03ee0980 --- /dev/null +++ b/geomagio/algorithm/MetadataAlgorithm.py @@ -0,0 +1,129 @@ +from pydantic import BaseModel +from obspy import UTCDateTime, Stream +from datetime import timedelta +from enum import Enum + +from ..metadata.flag.Flag import Flag +from ..metadata import Metadata, MetadataFactory, MetadataCategory +from ..edge.MiniSeedFactory import MiniSeedFactory +from ..edge.EdgeFactory import EdgeFactory + + +class DataFactory(str, Enum): + MINISEED = "miniseed" + EDGE = "edge" + + +class MetadataAlgorithm(BaseModel): + factory: DataFactory = DataFactory.MINISEED + observatory: str + channels: str + metadata_token: str + metadata_url: str + type: str + interval: str + starttime: UTCDateTime + endtime: UTCDateTime + + def get_data_factory(self): + """Helper method to return the correct data factory based on the factory type.""" + factory_class = ( + MiniSeedFactory if self.factory == DataFactory.MINISEED else EdgeFactory + ) + return factory_class( + port=22061, + observatory=self.observatory, + channels=self.channels, + type=self.type, + interval=self.interval, + ) + + def get_stream(self) -> Stream: + """Retrieve the data stream based on the factory type.""" + data_factory = self.get_data_factory() + + try: + return data_factory.get_timeseries( + starttime=self.starttime, + endtime=self.endtime, + ) + except Exception as e: + raise ValueError(f"Failed to retrieve data stream from {self.factory}: {e}") + + def create_metadata( + self, + metadata_class: Flag, + metadata_dict: dict, + category: MetadataCategory, + network: str, + channel: str, + location: str, + created_by: str, + starttime: UTCDateTime, + endtime: UTCDateTime, + status: str, + ) -> Metadata: + """Create metadata using the provided dictionary.""" + return Metadata( + category=category, + created_by=created_by, + starttime=starttime, + endtime=endtime, + metadata=metadata_class(**metadata_dict), + network=network, + channel=channel, + location=location, + station=self.observatory, + status=status, + ) + + def check_existing_metadata(self, metadata_obj: Metadata) -> str | None: + """Check if similar metadata already exists and return existing metadata ID if it does.""" + query_metadata = Metadata( + category=metadata_obj.category, + station=metadata_obj.station, + starttime=metadata_obj.starttime, + endtime=metadata_obj.endtime, + ) + metadata_factory = self._get_metadata_factory() + prior_metadata = metadata_factory.get_metadata(query=query_metadata) + + return prior_metadata[0].id if prior_metadata else None + + def update_metadata(self, metadata_obj: Metadata) -> Metadata: + """Update existing metadata.""" + return self._get_metadata_factory().update_metadata(metadata=metadata_obj) + + def create_new_metadata(self, metadata_obj: Metadata) -> Metadata: + """Create new metadata.""" + return self._get_metadata_factory().create_metadata(metadata=metadata_obj) + + def split_stream_by_day(self, stream: Stream) -> list[Stream]: + """Split stream into daily streams to prevent metadata from overlapping.""" + daily_streams = [] + + for trace in stream: + current_time = trace.stats.starttime + trace_endtime = trace.stats.endtime + + while current_time <= trace_endtime: + day_endtime = min( + UTCDateTime( + current_time.year, current_time.month, current_time.day, 23, 59 + ), + trace_endtime, + ) + + daily_streams.append( + stream.slice( + starttime=current_time, endtime=day_endtime, nearest_sample=True + ) + ) + + current_time += timedelta(days=1) + + return daily_streams + + def _get_metadata_factory(self) -> MetadataFactory: + """Helper method to instantiate MetadataFactory.""" + return MetadataFactory(token=self.metadata_token, url=self.metadata_url)