Skip to content
Snippets Groups Projects
Commit ee113b35 authored by Cain, Payton David's avatar Cain, Payton David
Browse files

Support legacy statefiles, add matrix process method

parent f635ef7a
No related branches found
No related tags found
2 merge requests!146Release CMO metadata to production,!58Affine running process
{ {
"matrix": [ "PC": -22,
[-1, 0, 0], "M11": -1,
[0, -1, 0], "M12": 0,
[0, 0, 1] "M13": 0,
], "M21": 0,
"pier_correction": -22.0 "M22": -1,
"M23": 0,
"M31": 0,
"M32": 0,
"M33": 1
} }
\ No newline at end of file
import numpy as np
from obspy import UTCDateTime from obspy import UTCDateTime
from pydantic import BaseModel from pydantic import BaseModel
from typing import Any, List, Optional from typing import Any, List, Optional
...@@ -24,3 +25,14 @@ class AdjustedMatrix(BaseModel): ...@@ -24,3 +25,14 @@ class AdjustedMatrix(BaseModel):
metrics: Optional[List[Metric]] = None metrics: Optional[List[Metric]] = None
starttime: Optional[UTCDateTime] = None starttime: Optional[UTCDateTime] = None
endtime: Optional[UTCDateTime] = None endtime: Optional[UTCDateTime] = None
def process(self, values: List[List[float]], outchannels=["X", "Y", "Z", "F"]):
""" Apply matrix to raw data. Apply pier correction to F when necessary """
data = np.vstack([values[0:3]] + [np.ones_like(values[0])])
adjusted = self.matrix @ data
if "F" in outchannels:
f = values[-1] + self.pier_correction
adjusted = np.vstack([adjusted[0 : len(outchannels) - 1]] + [f])
else:
adjusted = adjusted[0 : len(outchannels)]
return adjusted
...@@ -43,24 +43,33 @@ class AdjustedAlgorithm(Algorithm): ...@@ -43,24 +43,33 @@ class AdjustedAlgorithm(Algorithm):
"""Load algorithm state from a file. """Load algorithm state from a file.
File name is self.statefile. File name is self.statefile.
""" """
pier_correction = 0
if self.statefile is None: if self.statefile is None:
return return
# Adjusted matrix defaults to identity matrix
matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix = np.eye(matrix_size)
data = None
try: try:
with open(self.statefile, "r") as f: with open(self.statefile, "r") as f:
data = f.read() data = f.read()
data = json.loads(data) data = json.loads(data)
except IOError as err: except IOError as err:
sys.stderr.write("I/O error {0}".format(err)) raise FileNotFoundError("statefile not found")
if data is None or data == "": pier_correction = 0
return # Adjusted matrix defaults to identity matrix
self.matrix = AdjustedMatrix( matrix_size = len([c for c in self.get_input_channels() if c != "F"]) + 1
matrix=np.array(data["matrix"]), pier_correction=data["pier_correction"] matrix = np.eye(matrix_size)
) if self.statefile is not None:
try:
if "pier_correction" in data.keys():
# read data
matrix = np.array(data["matrix"])
pier_correction = data["pier_correction"]
if "PC" in data.keys():
# read data from legacy format
for row in range(matrix_size):
for col in range(matrix_size):
matrix[row, col] = np.float64(data[f"M{row+1}{col+1}"])
pier_correction = np.float64(data["PC"])
except:
raise KeyError("matrix and pier correction not found in statefile")
self.matrix = AdjustedMatrix(matrix=matrix, pier_correction=pier_correction)
def save_state(self): def save_state(self):
"""Save algorithm state to a file. """Save algorithm state to a file.
...@@ -123,14 +132,9 @@ class AdjustedAlgorithm(Algorithm): ...@@ -123,14 +132,9 @@ class AdjustedAlgorithm(Algorithm):
inchannels = self.get_input_channels() inchannels = self.get_input_channels()
outchannels = self.get_output_channels() outchannels = self.get_output_channels()
raws = np.vstack( raws = np.vstack(
[ [stream.select(channel=channel)[0].data for channel in inchannels]
stream.select(channel=channel)[0].data
for channel in inchannels
if channel != "F"
]
+ [np.ones_like(stream[0].data)]
) )
adjusted = np.matmul(self.matrix.matrix, raws) adjusted = self.matrix.process(values=raws, outchannels=outchannels)
out = Stream( out = Stream(
[ [
self.create_trace( self.create_trace(
...@@ -138,12 +142,9 @@ class AdjustedAlgorithm(Algorithm): ...@@ -138,12 +142,9 @@ class AdjustedAlgorithm(Algorithm):
stream.select(channel=inchannels[i])[0].stats, stream.select(channel=inchannels[i])[0].stats,
adjusted[i], adjusted[i],
) )
for i in range(len(adjusted) - 1) for i in range(len(adjusted))
] ]
) )
if "F" in inchannels and "F" in outchannels:
f = stream.select(channel="F")[0]
out += self.create_trace("F", f.stats, f.data + self.matrix.pier_correction)
return out return out
def can_produce_data(self, starttime, endtime, stream): def can_produce_data(self, starttime, endtime, stream):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment