Skip to content
Snippets Groups Projects
Commit c7645ac8 authored by Erin (Josh) Rigler's avatar Erin (Josh) Rigler
Browse files

Merge branch 'adjusted_algo_fix_June2024' into 'master'

Changes to AdjustedAlgorithm and AdjustedMatrix classes (and unit tests)

See merge request !328
parents e2c07570 e2cb548a
No related branches found
No related tags found
1 merge request!328Changes to AdjustedAlgorithm and AdjustedMatrix classes (and unit tests)
Pipeline #482262 passed
......@@ -32,22 +32,50 @@ class AdjustedMatrix(BaseModel):
def process(
self,
stream: Stream,
inchannels=["H", "E", "Z", "F"],
outchannels=["X", "Y", "Z", "F"],
inchannels=None,
outchannels=None,
):
"""Apply matrix to raw data. Apply pier correction to F when necessary"""
"""Apply matrix to raw data. Apply pier correction to F when necessary.
If inchannels is specified, look for inchannels in stream and fail if
all are not present. If outchannels is specified, return outchannels.
Both inchannels and outchannels must be consistent with self.matrix.
If inchannels is not specified, default to the first n-1 non-F channels
in stream, where n is the dimension of self.matrix. If outchannels is
not specified, outchannels will match inchannels.
NOTE: if non-F inchannels and outchannels are not compatible with
self.matrix, return NaNs for non-F outchannels
NOTE: if F is not in both inchannels and outchannels, return NaNs for F
"""
inchannels = inchannels or [trace.stats.channel for trace in stream]
outchannels = outchannels or inchannels
# new in/outchannels without "F"
inchannels_noF = [c for c in inchannels if c != "F"]
outchannels_noF = [c for c in outchannels if c != "F"]
raws = np.vstack(
[
stream.select(channel=channel)[0].data
for channel in inchannels
if channel != "F"
]
[stream.select(channel=channel)[0].data for channel in inchannels_noF]
+ [np.ones_like(stream[0].data)]
)
adjusted = self.matrix @ raws
if (
len(inchannels_noF) == len(outchannels_noF)
and len(inchannels_noF) == len(self.matrix) - 1
):
# matrix multiplication
adjusted = self.matrix @ raws
else:
# return NaNs if non-F inchannels or outchannels are not
# compatible with self.matrix
adjusted = np.full((len(self.matrix), raws.shape[1]), np.nan)
if "F" in inchannels and "F" in outchannels:
# return F only if specified in both inchannels and outchannels
f = stream.select(channel="F")[0].data + self.pier_correction
adjusted[-1] = f
else:
adjusted[-1] = np.nan
return adjusted
def get_metrics(self, readings: List[Reading]) -> List[Metric]:
......
......@@ -23,8 +23,6 @@ class AdjustedAlgorithm(Algorithm):
inchannels=None,
outchannels=None,
):
inchannels = inchannels or ["H", "E", "Z", "F"]
outchannels = outchannels or ["X", "Y", "Z", "F"]
Algorithm.__init__(
self,
inchannels=inchannels,
......@@ -35,18 +33,20 @@ class AdjustedAlgorithm(Algorithm):
self.statefile = statefile
self.data_type = data_type
self.location = location
# load matrix with statefile
if matrix is None:
# initialize state from statefile or inchannels;
# override both with matrix, and don't call load_state
if (statefile or inchannels) and (matrix is None):
self.load_state()
def load_state(self):
"""Load algorithm state from a file.
File name is self.statefile.
"""
# Adjusted matrix defaults to identity matrix
matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix = np.eye(matrix_size).tolist()
if self.statefile is None:
# if no statefile provided, default to identity Adjusted matrix
# (requires input channels to be specified...is this desirable?)
matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix = np.eye(matrix_size).tolist()
self.matrix = AdjustedMatrix(matrix=matrix)
return
try:
......@@ -59,6 +59,11 @@ class AdjustedAlgorithm(Algorithm):
self.matrix = AdjustedMatrix(**data)
elif "PC" in data:
# read data from legacy format
matrix_size = int(
# assumes matrix is square
np.sqrt(len([k for k in data.keys() if k.startswith("M")]))
)
matrix = np.eye(matrix_size).tolist()
for row in range(matrix_size):
for col in range(matrix_size):
matrix[row][col] = np.float64(data[f"M{row+1}{col+1}"])
......@@ -122,27 +127,48 @@ class AdjustedAlgorithm(Algorithm):
"""
out = None
inchannels = self.get_input_channels()
outchannels = self.get_output_channels()
inchannels = self.get_input_channels() or [
trace.stats.channel for trace in stream
]
outchannels = self.get_output_channels() or inchannels
adjusted = self.matrix.process(
stream,
inchannels=inchannels,
outchannels=outchannels,
)
out = Stream(
# if F in inchannels/outchannels is not in the last position,
# still return it as the last element the output Stream
inchannels_noF = [c for c in inchannels if c != "F"]
outchannels_noF = [c for c in outchannels if c != "F"]
out_noF = Stream(
[
self.create_trace(
outchannels[i],
stream.select(channel=inchannels[i])[0].stats,
adjusted[i],
(
self.create_trace(
outchannels_noF[i],
(
stream.select(channel=inchannels_noF[i])[0].stats
if i < len(inchannels_noF)
else stream.select(channel=inchannels_noF[-1])[0].stats
),
adjusted[i],
)
)
for i in range(len(outchannels))
for i in range(len(outchannels_noF))
]
)
return out
out_F = Stream(
self.create_trace("F", stream.select(channel="F")[0].stats, adjusted[-1])
if stream.select(channel="F")
else []
)
return out_noF + out_F
def can_produce_data(self, starttime, endtime, stream):
"""Can Product data
"""Can Produce data
Parameters
----------
starttime: UTCDateTime
......@@ -155,27 +181,18 @@ class AdjustedAlgorithm(Algorithm):
channels = self.get_input_channels()
# if F is available, can produce at least adjusted F
if "F" in channels and super().can_produce_data(
starttime, endtime, stream.select(channel="F")
):
return True
# if E-E and E-N available
if (
"E-E" in channels
and "E-N" in channels
and super().can_produce_data(starttime, endtime, stream)
):
return True
# check validity of remaining channels
for c in channels:
# non-F channels require valid data for matrix transform
if c != "F" and not (
super().can_produce_data(starttime, endtime, stream.select(channel=c))
):
return False
if "F" not in channels:
# only matrix transform desired if user didn't pass F
return True
elif super().can_produce_data(starttime, endtime, stream.select(channel="F")):
# F adjustment desired and possible; matrix transform possible
return True
# return false if F or remaining channels cannot produce data
return False
......
This diff is collapsed.
......@@ -34,7 +34,7 @@ typer = "^0.6.1"
# webservice
alembic = "^1.8.1"
Authlib = "^1.1.0"
cryptography = "^42.0.0"
cryptography = "^43.0.1"
databases = {extras = ["mysql", "sqlite"], version = "^0.6.1"}
fastapi = "^0.109.1"
gunicorn = "^23.0.0"
......
......@@ -54,7 +54,9 @@ def test_process_XYZF_AdjustedMatrix():
[0, 0, 0, 1],
],
pier_correction=-22,
)
),
inchannels=["H", "E", "Z", "F"],
outchannels=["X", "Y", "Z", "F"],
)
# load boulder Jan 16 files from /etc/ directory
......@@ -114,7 +116,11 @@ def test_process_XYZF_statefile():
Uses statefile to generate AdjustedMatrix
"""
# load adjusted data transform matrix and pier correction
a = AdjustedAlgorithm(statefile="etc/adjusted/adjbou_state_.json")
a = AdjustedAlgorithm(
statefile="etc/adjusted/adjbou_state_.json",
inchannels=["H", "E", "Z", "F"],
outchannels=["X", "Y", "Z", "F"],
)
# load boulder Jan 16 files from /etc/ directory
with open("etc/adjusted/BOU201601vmin.min") as f:
......@@ -167,7 +173,7 @@ def test_process_no_statefile():
Uses default AdjustedMatrix with identity transform
"""
# initialize adjusted algorithm with no statefile
a = AdjustedAlgorithm()
a = AdjustedAlgorithm(inchannels=["H", "E", "Z", "F"])
# load boulder Jan 16 files from /etc/ directory
with open("etc/adjusted/BOU201601vmin.min") as f:
raw = i2.IAGA2002Factory().parse_string(f.read())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment