Skip to content
Snippets Groups Projects
Commit 3f05d06c authored by Wilbur, Spencer Franklin's avatar Wilbur, Spencer Franklin
Browse files

Ran lint

parent 9ebcc9f6
No related branches found
No related tags found
1 merge request!329Added several changes to allow for users to retrieve one minute and one hour...
......@@ -192,7 +192,7 @@ class FilterAlgorithm(Algorithm):
if (
self.input_sample_period <= step["input_sample_period"]
and self.output_sample_period >= step["output_sample_period"]
):
):
if (
step["type"] == "average"
and step["output_sample_period"] != self.output_sample_period
......@@ -449,4 +449,4 @@ class FilterAlgorithm(Algorithm):
self.output_sample_period = TimeseriesUtility.get_delta_from_interval(
arguments.output_interval or arguments.interval
)
self.load_state()
\ No newline at end of file
self.load_state()
......@@ -37,27 +37,27 @@ def get_dbdt(
)
####################################### The router .get filter isnt visible on the docs page
# Look for register routers in the backend
####################################### The router .get filter isnt visible on the docs page
#Look for register routers in the backend
@router.get(
"/algorithms/filter/",
description="Filtered data dependent on requested interval",
name="Filtered Algorithm",
)
def get_filter(
query: DataApiQuery = Depends(get_data_query),
) -> Response:
filt = FilterAlgorithm(input_sample_period=SamplingPeriod.SECOND, output_sample_period=query.sampling_period, steps = None)
filt = FilterAlgorithm(
input_sample_period=SamplingPeriod.SECOND,
output_sample_period=query.sampling_period,
steps=None,
)
steps = filt.get_filter_steps()
# Update the filter algorithm with new steps
filt.steps = steps
# Update the filter algorithm with new steps
filt.steps = steps
print("The steps found in filt: ", steps)
data_factory = get_data_factory(query=query)
......@@ -65,7 +65,7 @@ def get_filter(
raw = get_timeseries(data_factory, query)
print("The initial timeseries output:", raw)
# run dbdt
timeseries = process_in_stages(raw,steps)
timeseries = process_in_stages(raw, steps)
print("The timeseries output:", timeseries)
print("The elements being used are ", query.elements)
......@@ -74,26 +74,30 @@ def get_filter(
return format_timeseries(
timeseries=timeseries, format=query.format, elements=elements
)
def process_in_stages(input_data, steps):
# Initialize the current data to the input data
# Process each step sequentially
for step in steps:
# Create an instance of FilterAlgorithm for the current step
filt = FilterAlgorithm(
input_sample_period= step["input_sample_period"],
input_sample_period=step["input_sample_period"],
output_sample_period=step["output_sample_period"],
steps=[step]
steps=[step],
)
# Process the current data through the filter algorithm
current_data = filt.process(input_data)
return current_data
##########################################
@router.post(
"/algorithms/residual",
description="Caclulates new absolutes and baselines from reading\n\n"
......
......@@ -36,8 +36,7 @@ def get_data_factory(
factory = FDSNFactory(network="IU", locationCode="40")
# if sampling_period in [SamplingPeriod.MINUTE]:
# print("THIS IS WHERE THE QUERY IS")
elif sampling_period in [
SamplingPeriod.TEN_HERTZ,
SamplingPeriod.HOUR,
......@@ -180,19 +179,19 @@ def get_timeseries(data_factory: TimeseriesFactory, query: DataApiQuery) -> Stre
data_factory: where to read data
query: parameters for the data to read
"""
#This will always return the one-second data for variometers to be used in any filter process
# This will always return the one-second data for variometers to be used in any filter process
if query.data_type == "variation":
timeseries = data_factory.get_timeseries(
starttime=query.starttime,
endtime=query.endtime,
observatory=query.id,
channels=query.elements,
type=query.data_type,
interval=TimeseriesUtility.get_interval_from_delta(SamplingPeriod.SECOND),
)
starttime=query.starttime,
endtime=query.endtime,
observatory=query.id,
channels=query.elements,
type=query.data_type,
interval=TimeseriesUtility.get_interval_from_delta(SamplingPeriod.SECOND),
)
else:
# get data
# get data
timeseries = data_factory.get_timeseries(
starttime=query.starttime,
endtime=query.endtime,
......
......@@ -24,7 +24,6 @@ from .FDSNSNCL import FDSNSNCL
from .SNCL import SNCL
class FDSNFactory(TimeseriesFactory):
"""TimeseriesFactory for Edge related data.
......@@ -111,7 +110,7 @@ class FDSNFactory(TimeseriesFactory):
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
add_empty_channels: bool = True,
add_empty_channels: bool = True,
) -> Stream:
"""Get timeseries data
......@@ -281,16 +280,16 @@ class FDSNFactory(TimeseriesFactory):
elif dip > 0 and dip < 180 and data[0].stats.channel[-2:] == "FZ":
data[0].data *= -1
# Remove channel Response:
# The function "remove_response" appears to be doing what we want;
# i.e. convert from counts to NT, but this may be a placeholder
# at least until we see how this function behaves if
# a station has a frequency response.
# Remove channel Response:
# The function "remove_response" appears to be doing what we want;
# i.e. convert from counts to NT, but this may be a placeholder
# at least until we see how this function behaves if
# a station has a frequency response.
if data.count != 0:
data.remove_response(output="DEF", zero_mean=False, taper=False)
self._set_metadata(data, observatory, channel, type, interval)
return data
def _post_process(
......@@ -359,8 +358,8 @@ class FDSNFactory(TimeseriesFactory):
trace.stats, observatory, channel, type, interval
)
def _get_orientations(self,
trace: Trace, starttime: UTCDateTime, sncl
def _get_orientations(
self, trace: Trace, starttime: UTCDateTime, sncl
) -> Tuple[float, float]:
# Retrieve station orientation information using FDSN for each trace
......@@ -385,4 +384,4 @@ class FDSNFactory(TimeseriesFactory):
trace.stats.azimuth = azimuth
# Return both azimuth and dip
return azimuth, dip
\ No newline at end of file
return azimuth, dip
......@@ -67,7 +67,9 @@ def get_FDSN_channel(
location: Optional[str] = None,
) -> str:
if location == "40" and network == "IU":
return _get_channel_start(interval=interval, data_type = data_type) + _get_channel_end(element=element)
return _get_channel_start(
interval=interval, data_type=data_type
) + _get_channel_end(element=element)
return get_channel(element=element, interval=interval, data_type=data_type)
......@@ -84,7 +86,8 @@ def _get_channel_end(element: str) -> str:
return element[1:]
raise ValueError(f"Unsupported element: {element}")
def _get_channel_start(interval: str, data_type:str) -> str:
def _get_channel_start(interval: str, data_type: str) -> str:
if interval == "tenhertz":
return "B"
if interval == "second":
......@@ -95,4 +98,4 @@ def _get_channel_start(interval: str, data_type:str) -> str:
return "R"
elif interval == "day":
return "P"
raise ValueError(f" Unexcepted interval: {interval}")
\ No newline at end of file
raise ValueError(f" Unexcepted interval: {interval}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment