Newer
Older
from datetime import datetime
from dateutil.relativedelta import relativedelta
Wernle, Alexandra Nicole
committed
from enum import Enum
from obspy.core import UTCDateTime, Stream
from ..algorithm.FilterAlgorithm import FilterAlgorithm
from ..edge import EdgeFactory
from ..pcdcp import PCDCPFactory, PCDCP_FILE_PATTERN
Wernle, Alexandra Nicole
committed
from ..metadata import Metadata, MetadataCategory, MetadataFactory
from ..residual import CalFileFactory, Reading, WebAbsolutesFactory
from ..Util import get_intervals
Wernle, Alexandra Nicole
committed
class ReadingsFactory(str, Enum):
METADATA_FACTORY = "metadatafactory"
WEB_ABSOLUTES = "webabsolutes"
CAL_TEMPLATE = "{OBSERVATORY}/{OBSERVATORY}{YEAR}PCD.cal"
PCDCP_TEMPLATE = f"%(OBS)s/{PCDCP_FILE_PATTERN}"
"""Entrypoint for magproc-prepfiles command defined in setup.py.
Runs prepfiles() with typer for argument parsing and usage.
"""
def prepfiles(
Wernle, Alexandra Nicole
committed
observatory: str = typer.Argument(
...,
help="Observatory code",
),
year: int = typer.Option(
default=datetime.now().year,
help="Search year, default",
),
month: int = typer.Option(
default=datetime.now().month,
help="Search month, default",
),
factory: ReadingsFactory = ReadingsFactory.WEB_ABSOLUTES,
# configuration arguments
Wernle, Alexandra Nicole
committed
calibration_path: str = typer.Argument(
os.getenv("CALIBRATION_PATH", "file://c:/Calibrat")
),
second_path: str = typer.Argument(os.getenv("SECOND_PATH", "file://c:/RAW")),
minute_path: str = typer.Argument(os.getenv("MINUTE_PATH", "file://c:/USGSDCP")),
temperature_path: str = typer.Argument(
os.getenv("TEMPERATURE_PATH", "file://c:/DEG")
),
edge_host: str = typer.Argument(os.getenv("EDGE_HOST", "edgecwb.usgs.gov")),
metadata_token: str = typer.Option(
default=os.getenv("GITLAB_API_TOKEN"),
help="Token for metadata web service.",
metavar="TOKEN",
show_default="environment variable GITLAB_API_TOKEN",
),
metadata_url: str = typer.Option(
default="https://geomag.usgs.gov/ws/secure/metadata",
help="URL to metadata web service",
metavar="URL",
),
web_absolutes_url: str = typer.Option(
default="https://geomag.usgs.gov/baselines/observation.json.php",
help="URL to web absolutes service",
metavar="URL",
),
):
month_start = datetime(year, month, 1)
month_end = month_start + relativedelta(months=1)
Wernle, Alexandra Nicole
committed
# Confirm which factory to use
if factory.value == ReadingsFactory.WEB_ABSOLUTES:
factory = WebAbsolutesFactory(url=web_absolutes_url)
else:
factory = MetadataFactory(token=metadata_token, url=metadata_url)
# Calibration data
Wernle, Alexandra Nicole
committed
factory=factory,
starttime=UTCDateTime(month_start - relativedelta(months=1)),
endtime=UTCDateTime(month_end + relativedelta(months=1)),
observatory=observatory,
template="file://" + os.path.join(calibration_path, CAL_TEMPLATE),
intervals = get_intervals(UTCDateTime(month_start), UTCDateTime(month_end))
for interval in intervals:
# Variation data
write_variation_data(
host=edge_host,
starttime=interval["start"],
endtime=interval["end"],
observatory=observatory,
second_template="file://" + os.path.join(second_path, PCDCP_TEMPLATE),
minute_template="file://" + os.path.join(minute_path, PCDCP_TEMPLATE),
)
# Temperature data
write_temperature_data(
host=edge_host,
starttime=interval["start"],
endtime=interval["end"],
observatory=observatory,
template="file://" + os.path.join(temperature_path, PCDCP_TEMPLATE),
)
Wernle, Alexandra Nicole
committed
factory: ReadingsFactory,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
template: str,
print(
f"Loading calibration data for {observatory} [{starttime}, {endtime}]",
file=sys.stderr,
)
url = template.format(OBSERVATORY=observatory, YEAR=starttime.year)
Wernle, Alexandra Nicole
committed
try:
readings = factory.get_readings(
observatory=observatory,
starttime=starttime,
endtime=endtime,
include_measurements=True,
)
except:
query = Metadata(
category=MetadataCategory.READING,
starttime=starttime,
endtime=endtime,
station=observatory,
Wernle, Alexandra Nicole
committed
status="reviewed",
data_valid=True,
Wernle, Alexandra Nicole
committed
)
metadata = factory.get_metadata(query=query)
# convert returned metadata.metadata into Reading objects
readings = []
Wernle, Alexandra Nicole
committed
for meta in metadata:
reading = Reading(**meta.metadata)
Wernle, Alexandra Nicole
committed
readings.append(reading)
# write cal file to specified path
CalFileFactory().write_file(url=url, readings=readings)
def write_pcdcp_file(
starttime: UTCDateTime,
endtime: UTCDateTime,
timeseries: Stream,
observatory: str,
interval: str,
channels: List[str],
template: str = PCDCP_FILE_PATTERN,
):
PCDCPFactory(
urlInterval=86400, urlTemplate=template, temperatures=temperatures
).put_timeseries(
timeseries=timeseries,
starttime=starttime,
endtime=endtime,
channels=channels,
interval=interval,
def write_temperature_data(
host: str,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
template: str = PCDCP_FILE_PATTERN,
) -> Stream:
algorithm = FilterAlgorithm(input_sample_period=60.0, output_sample_period=3600.0)
factory = EdgeFactory(host=host)
# load minute temperature data
f_starttime, f_endtime = algorithm.get_input_interval(starttime, endtime)
print(
f"Loading minute temperature data for {observatory} [{f_starttime}, {f_endtime}]",
file=sys.stderr,
)
timeseries_temp = factory.get_timeseries(
starttime=f_starttime,
endtime=f_endtime,
observatory=observatory,
channels=["UK1", "UK2", "UK3", "UK4"],
type="variation",
interval="minute",
)
# filter to one hour
print(
f"Generating hourly temperature data for {observatory} [{starttime}, {endtime}]",
file=sys.stderr,
)
timeseries_temperature = algorithm.process(timeseries_temp)
# write data
write_pcdcp_file(
starttime=starttime,
endtime=endtime,
timeseries=timeseries_temperature,
observatory=observatory,
channels=["UK1", "UK2", "UK3", "UK4"],
template=template,
def write_variation_data(
host: str,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
second_template: str = PCDCP_FILE_PATTERN,
minute_template: str = PCDCP_FILE_PATTERN,
algorithm = FilterAlgorithm(input_sample_period=1.0, output_sample_period=60.0)
factory = EdgeFactory(host=host)
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# load second data
f_starttime, f_endtime = algorithm.get_input_interval(starttime, endtime)
print(
f"Loading second variation data for {observatory} [{f_starttime}, {f_endtime}]",
file=sys.stderr,
)
timeseries_second = factory.get_timeseries(
starttime=f_starttime,
endtime=f_endtime,
observatory=observatory,
channels=["H", "E", "Z", "F"],
type="variation",
interval="second",
)
# filter to one minute
print(
f"Generating one minute variation data for {observatory} [{starttime}, {endtime}]",
file=sys.stderr,
)
timeseries_minute = algorithm.process(timeseries_second)
# write files
write_pcdcp_file(
starttime=starttime,
endtime=endtime,
timeseries=timeseries_second.trim(starttime, endtime),
observatory=observatory,
interval="second",
channels=["H", "E", "Z", "F"],
template=second_template,
)
write_pcdcp_file(
starttime=starttime,
endtime=endtime,
timeseries=timeseries_minute,
observatory=observatory,
interval="minute",
channels=["H", "E", "Z", "F"],
template=minute_template,
)