From 9ba30d74ade73ee957fbf26d483705840ae4eae6 Mon Sep 17 00:00:00 2001
From: Alex Wernle <awernle@usgs.gov>
Date: Fri, 4 Oct 2024 10:12:24 -0600
Subject: [PATCH] New MetadataAlgorithm Class for other metadata algorithms to
 inherit from.

---
 geomagio/algorithm/MetadataAlgorithm.py | 129 ++++++++++++++++++++++++
 1 file changed, 129 insertions(+)
 create mode 100644 geomagio/algorithm/MetadataAlgorithm.py

diff --git a/geomagio/algorithm/MetadataAlgorithm.py b/geomagio/algorithm/MetadataAlgorithm.py
new file mode 100644
index 00000000..7b85e82e
--- /dev/null
+++ b/geomagio/algorithm/MetadataAlgorithm.py
@@ -0,0 +1,129 @@
+from pydantic import BaseModel
+from obspy import UTCDateTime, Stream
+from datetime import timedelta
+from enum import Enum
+
+from ..metadata.flag.Flag import Flag
+from ..metadata import Metadata, MetadataFactory, MetadataCategory
+from ..edge.MiniSeedFactory import MiniSeedFactory
+from ..edge.EdgeFactory import EdgeFactory
+
+
+class DataFactory(str, Enum):
+    MINISEED = "miniseed"
+    EDGE = "edge"
+
+
+class MetadataAlgorithm(BaseModel):
+    factory: DataFactory = DataFactory.MINISEED
+    observatory: str
+    channels: str
+    metadata_token: str
+    metadata_url: str
+    type: str
+    interval: str
+    starttime: UTCDateTime
+    endtime: UTCDateTime
+
+    def get_data_factory(self):
+        """Helper method to return the correct data factory based on the factory type."""
+        factory_class = (
+            MiniSeedFactory if self.factory == DataFactory.MINISEED else EdgeFactory
+        )
+        return factory_class(
+            port=22061,
+            observatory=self.observatory,
+            channels=self.channels,
+            type=self.type,
+            interval=self.interval,
+        )
+
+    def get_stream(self) -> Stream:
+        """Retrieve the data stream based on the factory type."""
+        data_factory = self.get_data_factory()
+
+        try:
+            return data_factory.get_timeseries(
+                starttime=self.starttime,
+                endtime=self.endtime,
+            )
+        except Exception as e:
+            raise ValueError(f"Failed to retrieve data stream from {self.factory}: {e}")
+
+    def create_metadata(
+        self,
+        metadata_class: Flag,
+        metadata_dict: dict,
+        category: MetadataCategory,
+        network: str,
+        channel: str,
+        location: str,
+        created_by: str,
+        starttime: UTCDateTime,
+        endtime: UTCDateTime,
+        status: str,
+    ) -> Metadata:
+        """Create metadata using the provided dictionary."""
+        return Metadata(
+            category=category,
+            created_by=created_by,
+            starttime=starttime,
+            endtime=endtime,
+            metadata=metadata_class(**metadata_dict),
+            network=network,
+            channel=channel,
+            location=location,
+            station=self.observatory,
+            status=status,
+        )
+
+    def check_existing_metadata(self, metadata_obj: Metadata) -> str | None:
+        """Check if similar metadata already exists and return existing metadata ID if it does."""
+        query_metadata = Metadata(
+            category=metadata_obj.category,
+            station=metadata_obj.station,
+            starttime=metadata_obj.starttime,
+            endtime=metadata_obj.endtime,
+        )
+        metadata_factory = self._get_metadata_factory()
+        prior_metadata = metadata_factory.get_metadata(query=query_metadata)
+
+        return prior_metadata[0].id if prior_metadata else None
+
+    def update_metadata(self, metadata_obj: Metadata) -> Metadata:
+        """Update existing metadata."""
+        return self._get_metadata_factory().update_metadata(metadata=metadata_obj)
+
+    def create_new_metadata(self, metadata_obj: Metadata) -> Metadata:
+        """Create new metadata."""
+        return self._get_metadata_factory().create_metadata(metadata=metadata_obj)
+
+    def split_stream_by_day(self, stream: Stream) -> list[Stream]:
+        """Split stream into daily streams to prevent metadata from overlapping."""
+        daily_streams = []
+
+        for trace in stream:
+            current_time = trace.stats.starttime
+            trace_endtime = trace.stats.endtime
+
+            while current_time <= trace_endtime:
+                day_endtime = min(
+                    UTCDateTime(
+                        current_time.year, current_time.month, current_time.day, 23, 59
+                    ),
+                    trace_endtime,
+                )
+
+                daily_streams.append(
+                    stream.slice(
+                        starttime=current_time, endtime=day_endtime, nearest_sample=True
+                    )
+                )
+
+                current_time += timedelta(days=1)
+
+        return daily_streams
+
+    def _get_metadata_factory(self) -> MetadataFactory:
+        """Helper method to instantiate MetadataFactory."""
+        return MetadataFactory(token=self.metadata_token, url=self.metadata_url)
-- 
GitLab