Skip to content
Snippets Groups Projects
Commit 24fa5d71 authored by Erin (Josh) Rigler's avatar Erin (Josh) Rigler
Browse files

Changes to AdjustedAlgorithm class

- no longer forces default inchannels and outchannels, and lets the
  AdjustedMatrix.process() method "do the right thing" if neither is
  explicitly set by the user
- some mostly aesthetic changes that more cleanly separate the vector
  and scalar (F) adjustements
- modified AdjustedAlgorithm.can_produce_data():
  a. if any non-F inchannels cannot produce data, return False
  b. if F inchannel can produce data, return True as long as all
     non-F inchannels can also produce data
  c. if this is not desirable (e.g., you want to treat non-F and F
     channels independently), it is necessary to create two instances
     of AdjustedAlgorithm, one for non-F inchannels, and one for F.
parent 5ea8d330
No related branches found
No related tags found
1 merge request!328Changes to AdjustedAlgorithm and AdjustedMatrix classes (and unit tests)
......@@ -23,8 +23,6 @@ class AdjustedAlgorithm(Algorithm):
inchannels=None,
outchannels=None,
):
inchannels = inchannels or ["H", "E", "Z", "F"]
outchannels = outchannels or ["X", "Y", "Z", "F"]
Algorithm.__init__(
self,
inchannels=inchannels,
......@@ -35,18 +33,20 @@ class AdjustedAlgorithm(Algorithm):
self.statefile = statefile
self.data_type = data_type
self.location = location
# load matrix with statefile
if matrix is None:
# initialize state from statefile or inchannels;
# override both with matrix, and don't call load_state
if (statefile or inchannels) and (matrix is None):
self.load_state()
def load_state(self):
"""Load algorithm state from a file.
File name is self.statefile.
"""
# Adjusted matrix defaults to identity matrix
matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix = np.eye(matrix_size).tolist()
if self.statefile is None:
# if no statefile provided, default to identity Adjusted matrix
# (requires input channels to be specified...is this desirable?)
matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix = np.eye(matrix_size).tolist()
self.matrix = AdjustedMatrix(matrix=matrix)
return
try:
......@@ -59,6 +59,11 @@ class AdjustedAlgorithm(Algorithm):
self.matrix = AdjustedMatrix(**data)
elif "PC" in data:
# read data from legacy format
matrix_size = int(
# assumes matrix is square
np.sqrt(len([k for k in data.keys() if k.startswith("M")]))
)
matrix = np.eye(matrix_size).tolist()
for row in range(matrix_size):
for col in range(matrix_size):
matrix[row][col] = np.float64(data[f"M{row+1}{col+1}"])
......@@ -122,27 +127,48 @@ class AdjustedAlgorithm(Algorithm):
"""
out = None
inchannels = self.get_input_channels()
outchannels = self.get_output_channels()
inchannels = self.get_input_channels() or [
trace.stats.channel for trace in stream
]
outchannels = self.get_output_channels() or inchannels
adjusted = self.matrix.process(
stream,
inchannels=inchannels,
outchannels=outchannels,
)
out = Stream(
# if F in inchannels/outchannels is not in the last position,
# still return it as the last element the output Stream
inchannels_noF = [c for c in inchannels if c != "F"]
outchannels_noF = [c for c in outchannels if c != "F"]
out_noF = Stream(
[
self.create_trace(
outchannels[i],
stream.select(channel=inchannels[i])[0].stats,
adjusted[i],
(
self.create_trace(
outchannels_noF[i],
(
stream.select(channel=inchannels_noF[i])[0].stats
if i < len(inchannels_noF)
else stream.select(channel=inchannels_noF[-1])[0].stats
),
adjusted[i],
)
)
for i in range(len(outchannels))
for i in range(len(outchannels_noF))
]
)
return out
out_F = Stream(
self.create_trace("F", stream.select(channel="F")[0].stats, adjusted[-1])
if stream.select(channel="F")
else []
)
return out_noF + out_F
def can_produce_data(self, starttime, endtime, stream):
"""Can Product data
"""Can Produce data
Parameters
----------
starttime: UTCDateTime
......@@ -155,27 +181,18 @@ class AdjustedAlgorithm(Algorithm):
channels = self.get_input_channels()
# if F is available, can produce at least adjusted F
if "F" in channels and super().can_produce_data(
starttime, endtime, stream.select(channel="F")
):
return True
# if E-E and E-N available
if (
"E-E" in channels
and "E-N" in channels
and super().can_produce_data(starttime, endtime, stream)
):
return True
# check validity of remaining channels
for c in channels:
# non-F channels require valid data for matrix transform
if c != "F" and not (
super().can_produce_data(starttime, endtime, stream.select(channel=c))
):
return False
if "F" not in channels:
# only matrix transform desired if user didn't pass F
return True
elif super().can_produce_data(starttime, endtime, stream.select(channel="F")):
# F adjustment desired and possible; matrix transform possible
return True
# return false if F or remaining channels cannot produce data
return False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment