diff --git a/geomagio/Controller.py b/geomagio/Controller.py index e91ef4d2520aec5f017676af24e23a0686ac1027..4342bd856db72ec76959f369520faa6d1058ea01 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -62,7 +62,13 @@ class Controller(object): self._outputInterval = outputInterval def _get_input_timeseries( - self, observatory, channels, starttime, endtime, algorithm=None + self, + observatory, + channels, + starttime, + endtime, + algorithm=None, + interval=None, ): """Get timeseries from the input factory for requested options. @@ -102,7 +108,7 @@ class Controller(object): starttime=input_start, endtime=input_end, channels=channels, - interval=self._inputInterval, + interval=interval or self._inputInterval, ) return timeseries @@ -129,7 +135,14 @@ class Controller(object): t.stats.channel = to_name return timeseries - def _get_output_timeseries(self, observatory, channels, starttime, endtime): + def _get_output_timeseries( + self, + observatory, + channels, + starttime, + endtime, + interval=None, + ): """Get timeseries from the output factory for requested options. Parameters @@ -154,7 +167,7 @@ class Controller(object): starttime=starttime, endtime=endtime, channels=channels, - interval=self._outputInterval, + interval=interval or self._outputInterval, ) return timeseries @@ -224,6 +237,8 @@ class Controller(object): input_channels: Optional[List[str]] = None, input_timeseries: Optional[Stream] = None, output_channels: Optional[List[str]] = None, + input_interval: Optional[str] = None, + output_interval: Optional[str] = None, no_trim: bool = False, realtime: Union[bool, int] = False, rename_input_channel: Optional[List[List[str]]] = None, @@ -239,6 +254,8 @@ class Controller(object): input_channels: list of channels to read input_timeseries: used by run_as_update, which has already read input. output_channels: list of channels to write + input_interval: input data interval + output_interval: output data interval no_trim: whether to trim output to starttime/endtime interval realtime: number of seconds in realtime interval rename_input_channel: list of input channel renames @@ -250,6 +267,8 @@ class Controller(object): algorithm = algorithm or self._algorithm input_channels = input_channels or algorithm.get_input_channels() output_channels = output_channels or algorithm.get_output_channels() + input_interval = input_interval or self._inputInterval + output_interval = output_interval or self._outputInterval next_starttime = algorithm.get_next_starttime() starttime = next_starttime or starttime # input @@ -259,6 +278,7 @@ class Controller(object): starttime=starttime, endtime=endtime, channels=input_channels, + interval=input_interval, ) if timeseries.count() == 0: # no data to process @@ -295,6 +315,7 @@ class Controller(object): starttime=starttime, endtime=endtime, channels=output_channels, + interval=output_interval, ) def run_as_update( @@ -306,6 +327,8 @@ class Controller(object): algorithm: Optional[Algorithm] = None, input_channels: Optional[List[str]] = None, output_channels: Optional[List[str]] = None, + input_interval: Optional[str] = None, + output_interval: Optional[str] = None, no_trim: bool = False, realtime: Union[bool, int] = False, rename_input_channel: Optional[List[List[str]]] = None, @@ -324,6 +347,8 @@ class Controller(object): input_channels: list of channels to read input_timeseries: used by run_as_update, which has already read input. output_channels: list of channels to write + input_interval: input data interval + output_interval: output data interval no_trim: whether to trim output to starttime/endtime interval realtime: number of seconds in realtime interval rename_input_channel: list of input channel renames @@ -348,6 +373,8 @@ class Controller(object): raise AlgorithmException("Stateful algorithms cannot use run_as_update") input_channels = input_channels or algorithm.get_input_channels() output_channels = output_channels or algorithm.get_output_channels() + input_interval = input_interval or self._inputInterval + output_interval = output_interval or self._outputInterval print( "checking gaps", starttime, @@ -362,6 +389,7 @@ class Controller(object): starttime=starttime, endtime=endtime, channels=output_channels, + interval=output_interval, ) if len(output_timeseries) > 0: # find gaps in output, so they can be updated @@ -384,6 +412,7 @@ class Controller(object): starttime=output_gap[0], endtime=output_gap[1], channels=input_channels, + interval=input_interval, ) if not algorithm.can_produce_data( starttime=output_gap[0], endtime=output_gap[1], stream=input_timeseries @@ -403,6 +432,8 @@ class Controller(object): endtime=recurse_endtime, input_channels=input_channels, output_channels=output_channels, + input_interval=input_interval, + output_interval=output_interval, no_trim=no_trim, realtime=realtime, rename_input_channel=rename_input_channel, @@ -429,6 +460,8 @@ class Controller(object): input_channels=input_channels, input_timeseries=input_timeseries, output_channels=output_channels, + input_interval=input_interval, + output_interval=output_interval, no_trim=no_trim, realtime=realtime, rename_input_channel=rename_input_channel, diff --git a/geomagio/processing/obsrio.py b/geomagio/processing/obsrio.py index b4510ffe18dc96ac301b11afa1edd5b7ac763922..bf0acf2b81687c6a3f590e6b4cd8b902f4730aa3 100644 --- a/geomagio/processing/obsrio.py +++ b/geomagio/processing/obsrio.py @@ -139,11 +139,9 @@ def obsrio_day( """Filter 1 second edge H,E,Z,F to 1 day miniseed U,V,W,F.""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_edge_factory(data_type="variation", interval="minute"), + inputFactory=input_factory or get_edge_factory(), inputInterval="minute", - outputFactory=output_factory - or get_miniseed_factory(data_type="variation", interval="day"), + outputFactory=output_factory or get_miniseed_factory(), outputInterval="day", ) renames = {"H": "U", "E": "V", "Z": "W", "F": "F"} @@ -178,11 +176,9 @@ def obsrio_hour( """Filter 1 second edge H,E,Z,F to 1 hour miniseed U,V,W,F.""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_edge_factory(data_type="variation", interval="minute"), + inputFactory=input_factory or get_edge_factory(), inputInterval="minute", - outputFactory=output_factory - or get_miniseed_factory(data_type="variation", interval="hour"), + outputFactory=output_factory or get_miniseed_factory(), outputInterval="hour", ) renames = {"H": "U", "E": "V", "Z": "W", "F": "F"} @@ -221,11 +217,9 @@ def obsrio_minute( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_edge_factory(data_type="variation", interval="second"), + inputFactory=input_factory or get_edge_factory(), inputInterval="second", - outputFactory=output_factory - or get_edge_factory(data_type="variation", interval="minute"), + outputFactory=output_factory or get_edge_factory(), outputInterval="minute", ) for channel in ["H", "E", "Z", "F"]: @@ -258,10 +252,8 @@ def obsrio_second( starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( algorithm=Algorithm(inchannels=("F",), outchannels=("F",)), - inputFactory=input_factory or get_miniseed_factory(data_type="variation"), - inputInterval="second", - outputFactory=output_factory or get_edge_factory(data_type="variation"), - outputInterval="second", + inputFactory=input_factory or get_miniseed_factory(), + outputFactory=output_factory or get_edge_factory(), ) controller.run_as_update( observatory=(observatory,), @@ -285,11 +277,9 @@ def obsrio_temperatures( """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4).""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_miniseed_factory(data_type="variation", interval="second"), + inputFactory=input_factory or get_miniseed_factory(), inputInterval="second", - outputFactory=output_factory - or get_edge_factory(data_type="variation", interval="minute"), + outputFactory=output_factory or get_edge_factory(), outputInterval="minute", ) renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"} @@ -324,11 +314,9 @@ def obsrio_tenhertz( """Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z.""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_miniseed_factory(data_type="variation", interval="tenhertz"), + inputFactory=input_factory or get_miniseed_factory(), inputInterval="tenhertz", - outputFactory=output_factory - or get_edge_factory(data_type="variation", interval="second"), + outputFactory=output_factory or get_edge_factory(), outputInterval="second", ) renames = {"U": "H", "V": "E", "W": "Z"}