Skip to content
Snippets Groups Projects

Covjson mvp

Files
24
+ 134
0
from pydantic import BaseModel
from obspy import UTCDateTime, Stream
from datetime import timedelta
from enum import Enum
from typing import Optional
from ..metadata import Metadata, MetadataFactory, MetadataCategory
from ..edge.MiniSeedFactory import MiniSeedFactory
from ..edge.EdgeFactory import EdgeFactory
from ..pydantic_utcdatetime import CustomUTCDateTimeType
class DataFactory(str, Enum):
MINISEED = "miniseed"
EDGE = "edge"
class MetadataAlgorithm(BaseModel):
factory: Optional[DataFactory] = DataFactory.MINISEED
observatory: Optional[str] = None
channels: Optional[str] = None
metadata_token: Optional[str] = None
metadata_url: Optional[str] = None
type: Optional[str] = None
interval: Optional[str] = None
starttime: Optional[CustomUTCDateTimeType] = None
endtime: Optional[CustomUTCDateTimeType] = None
def get_data_factory(self):
"""Helper method to return the correct data factory based on the factory type."""
factory_class = (
MiniSeedFactory if self.factory == DataFactory.MINISEED else EdgeFactory
)
return factory_class(
port=22061,
observatory=self.observatory,
channels=self.channels,
type=self.type,
interval=self.interval,
)
def get_stream(self) -> Stream:
"""Retrieve the data stream based on the factory type."""
data_factory = self.get_data_factory()
try:
return data_factory.get_timeseries(
starttime=self.starttime,
endtime=self.endtime,
add_empty_channels=False,
)
except Exception as e:
raise ValueError(f"Failed to retrieve data stream from {self.factory}: {e}")
def create_metadata(
self,
metadata_class: MetadataCategory,
metadata_dict: dict,
category: MetadataCategory,
network: str,
channel: str,
location: str,
created_by: str,
starttime: UTCDateTime,
endtime: UTCDateTime,
status: str,
) -> Metadata:
"""Create metadata using the provided dictionary."""
return Metadata(
category=category,
created_by=created_by,
starttime=starttime,
endtime=endtime,
metadata=(metadata_class(**metadata_dict)).model_dump(),
network=network,
channel=channel,
location=location,
station=self.observatory,
status=status,
)
def check_existing_metadata(self, metadata_obj: Metadata) -> Optional[Metadata]:
"""Check if similar metadata already exists and return existing metadata if it does."""
query_metadata = Metadata(
category=metadata_obj.category,
station=metadata_obj.station,
starttime=metadata_obj.starttime,
endtime=metadata_obj.endtime,
channel=metadata_obj.channel,
)
metadata_factory = self._get_metadata_factory()
prior_metadata = metadata_factory.get_metadata(query=query_metadata)
return prior_metadata if prior_metadata else None
def update_metadata(self, metadata_obj: Metadata) -> Metadata:
"""Update existing metadata."""
return self._get_metadata_factory().update_metadata(metadata=metadata_obj)
def create_new_metadata(self, metadata_obj: Metadata) -> Metadata:
"""Create new metadata."""
return self._get_metadata_factory().create_metadata(metadata=metadata_obj)
def split_stream_by_day(self, stream: Stream) -> list[Stream]:
"""Split stream into daily streams to control size of spike arrays."""
daily_streams = []
# get min and max time
current_time = min(trace.stats.starttime for trace in stream)
end_time = max(trace.stats.endtime for trace in stream)
# loop through each day and slice the stream accordingly
while current_time <= end_time:
day_endtime = min(
UTCDateTime(
current_time.year, current_time.month, current_time.day, 23, 59, 59
),
end_time,
)
# slice stream for the current day
daily_stream = stream.slice(
starttime=current_time, endtime=day_endtime, nearest_sample=True
)
if daily_stream:
daily_streams.append(daily_stream)
current_time += timedelta(days=1)
return daily_streams
def _get_metadata_factory(self) -> MetadataFactory:
"""Helper method to instantiate MetadataFactory."""
return MetadataFactory(token=self.metadata_token, url=self.metadata_url)
Loading