diff --git a/geomagio/biq/BIQFactory.py b/geomagio/biq/BIQFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..59d0117a4b84110c5d8d4ad6239f994bb4e1095b --- /dev/null +++ b/geomagio/biq/BIQFactory.py @@ -0,0 +1,129 @@ +"""Factory that parses BIQ files into arrays and/or ObsPy Stream.""" +import numpy as np +import obspy + +from typing import BinaryIO + + +class BIQParser: + """Geomag BIQ file fields.""" + + def __init__(self, filename: BinaryIO): + self.filename = filename + + # define the data types for a day of data + self.day_block_dtype = np.dtype( + [ + ("Station ID", "S4"), + ("Year & Day", np.int32), + ("Co-Latitude", np.int32), + ("Longitude", np.int32), + ("Elevation", np.int32), + ("Orientation", "S4"), + ("Origination", "S4"), + ("D Conversion", np.int32), + ("Quality", "S4"), + ("Instrumentation", "S4"), + ("Custom Field 1", np.int32), + ("Custom Field 2", np.int32), + ("Custom Field 3", "S4"), + ("Custom Field 4", "S4"), + ("Custom Field 5", np.int32), + ("Custom Field 6", "S4"), + ("X-minute", np.int32, (1440,)), + ("Y-minute", np.int32, (1440,)), + ("Z-minute", np.int32, (1440,)), + ("G-minute", np.int32, (1440,)), + ("X-hourly", np.int32, (24,)), + ("Y-hourly", np.int32, (24,)), + ("Z-hourly", np.int32, (24,)), + ("G-hourly", np.int32, (24,)), + ("X-daily", np.int32, (1,)), + ("Y-daily", np.int32, (1,)), + ("Z-daily", np.int32, (1,)), + ("G-daily", np.int32, (1,)), + ("K-values", np.int32, (8,)), + ("Leftovers", np.int32, (4,)), + ] + ) + + def read_data(self): + with open(self.filename, "rb") as f: + data = np.fromfile(f, dtype=self.day_block_dtype) + + # call the check_station_id method to check that the Station ID for each day is the same + self.check_station_id(data) + + return data + + def check_station_id(self, data): + # get the unique station IDs + station_ids = set(day_data["Station ID"].decode() for day_data in data) + + # raise an error if there is more than one unique station ID + if len(station_ids) > 1: + raise ValueError( + "One or more station IDs do not match. File may be corrupted or written incorrectly!" + ) + + def get_stream(self): + data = self.read_data() + + # combine the minute data from all the days into an obspy stream + st = obspy.Stream() + tr = obspy.Trace( + header={ + "network": "NT", + "location": "Q0", # THIS IS AN ASSUMPTION! + "station": data[0]["Station ID"].decode().strip(), + "delta": 60, + "starttime": obspy.UTCDateTime(str(data[0]["Year & Day"])), + } + ) + + for day in data: + # add stats for header information + tr.stats.starttime = obspy.UTCDateTime(str(day["Year & Day"])) + + x_tr = tr.copy() + y_tr = tr.copy() + z_tr = tr.copy() + g_tr = tr.copy() + x_tr.stats.channel = "X" + y_tr.stats.channel = "Y" + z_tr.stats.channel = "Z" + g_tr.stats.channel = "G" + # convert to floats so that we're not stuck dealing with ints later + x_tr.data = day["X-minute"].astype(np.float64) + y_tr.data = day["Y-minute"].astype(np.float64) + z_tr.data = day["Z-minute"].astype(np.float64) + g_tr.data = day["G-minute"].astype(np.float64) + + st += obspy.Stream(traces=[x_tr, y_tr, z_tr, g_tr]) + st.merge() + + # data was stored as scaled integers, unscale them and translate the 99999's to nan + for tr in st: + tr.data[tr.data == 999999] = np.nan + tr.data = tr.data / 10 + + st = self.append_f(st) + + return st + + def append_f(self, st): + # calculate F from XYZ and G ("delta-F") + f_xyz = np.zeros_like(st.select(channel="G")[0].data) + for tr in st.select(channel="[X|Y|Z]"): + f_xyz = f_xyz + np.square(tr.data) + f_xyz = np.sqrt(f_xyz) + + f_tr = obspy.Trace( + f_xyz - st.select(channel="G")[0].data, + header=st.select(channel="G")[0].stats, + ) + f_tr.stats.channel = "F" + + st.append(f_tr) + + return st diff --git a/geomagio/processing/make_iaga.py b/geomagio/processing/make_iaga.py new file mode 100644 index 0000000000000000000000000000000000000000..757973904953799f2245051de806f732d3ef8e62 --- /dev/null +++ b/geomagio/processing/make_iaga.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +""" +@author: awernle +""" +import os +from obspy import Stream, UTCDateTime +from typing import Optional, List +from pathlib import Path + +import typer + +from ..metadata import Metadata, MetadataFactory, MetadataCategory +from ..biq.BIQFactory import BIQParser +from ..iaga2002.IAGA2002Factory import IAGA2002Factory + + +def make_iaga( + in_directory: Optional[Path] = typer.Option( + None, + exists=True, + file_okay=False, + dir_okay=True, + writable=False, + readable=False, + resolve_path=True, + help="path to BIQ files", + ), + out_directory: Optional[Path] = typer.Option( + None, + exists=True, + file_okay=False, + dir_okay=True, + writable=False, + readable=False, + resolve_path=True, + help="path to write IAGA files", + ), + metadata_token: str = typer.Option( + default=os.getenv("GITLAB_API_TOKEN"), + help="Token for metadata web service.", + metavar="TOKEN", + show_default="environment variable GITLAB_API_TOKEN", + ), + metadata_url: str = typer.Option( + default="https://geomag.usgs.gov/ws/secure/metadata", + help="URL to metadata web service", + metavar="URL", + ), + filter_comments: Optional[List[str]] = typer.Option( + default=None, + help="str to append optional filter comments", + ), + conditions_of_use: str = typer.Option( + default=None, + help="str to append optional conditions of use", + ), + force: bool = typer.Option( + default=False, + help="Force skip the check to convert BIQ files", + ), +): + """Make IAGA file from BIQ file.""" + BIQ_files = [file for file in in_directory.glob("*.BIQ")] + + # confirm whether or not to write IAGA files + if not force: + typer.confirm( + f"Are you sure you want to write IAGA files for {len(BIQ_files)} BIQ files?", + abort=True, + ) + + with typer.progressbar( + iterable=BIQ_files, label="Writing IAGA files" + ) as progressbar: + for BIQ_file in progressbar: + """Convert BIQ file to ObsPy stream, add observatory metadata from webservice then write IAGA file""" + stream = get_stream(BIQ_file=BIQ_file) + # remove unnecessary channels if they exist + for tr in stream: + if tr.stats.channel[-1] not in ["X", "Y", "Z", "F"]: + stream.remove(tr) + station = [tr.stats.station for tr in stream][0] + channels = [tr.stats.channel for tr in stream] + + # get metadata from webservice + metadata_factory = MetadataFactory(token=metadata_token, url=metadata_url) + metadata = load_observatory_metadata( + factory=metadata_factory, station=station + ) + + # add metadata to ObsPy stream + stream = create_stream_metadata( + stream=stream, + metadata=metadata, + filter_comments=filter_comments, + conditions_of_use=conditions_of_use, + ) + + # split stream into days and write .min IAGA files to optional directory + write_iaga_file( + stream=stream, + station=station, + channels=channels, + out_directory=out_directory, + ) + + +def load_observatory_metadata(factory: MetadataFactory, station: str): + """load observatory metadata object. + + Parameters + ---------- + factory: + factory to load metadata. + station: + observatory station to call. + + Returns + ------- + observatory metadata + """ + metadata = Metadata(category=MetadataCategory.OBSERVATORY, station=station) + metadata = factory.get_metadata(query=metadata) + return metadata[0].metadata + + +def get_stream(BIQ_file): + """Create ObsPy stream for IAGA file. + + Parameters + ---------- + BIQ_file: + BIQ_file to convert to stream. + + Returns + ------- + ObsPy stream. + """ + biq_file = BIQParser(BIQ_file) + stream = BIQParser.get_stream(biq_file) + return stream + + +def create_stream_metadata( + stream: Stream, metadata: Metadata, filter_comments: str, conditions_of_use: str +): + """Create ObsPy stream metadata from observatory metadata and options. + + Parameters + ---------- + stream: + ObsPy stream to add metadata to. + metadata: + Observatory metadata + filter_comments: + Optional comments on filters. + conditions_of_use: + Optional conditions of use. + + Returns + ------- + ObsPy stream with additional metadata. + """ + + for tr in stream: + # add required IAGA headers + tr.stats.station_name = str(metadata["name"]) + tr.stats.geodetic_latitude = str(metadata["latitude"]) + tr.stats.geodetic_longitude = str(metadata["longitude"]) + tr.stats.elevation = str(metadata["elevation"]) + tr.stats.sensor_orientation = str(metadata["sensor_orientation"]) + tr.stats.sensor_sampling_rate = int(100) + tr.stats.data_interval_type = str( + "filtered 1-minute (00:15-01:45)" + ) # hardcoded for now + tr.stats.data_type = str("quasi-definitive") # hardcoded for now + + # add optional fields + if filter_comments is not None: + tr.stats.filter_comments = filter_comments + if conditions_of_use is not None: + tr.stats.conditions_of_use = conditions_of_use + tr.stats.agency_name = metadata["agency_name"] + if tr.stats.agency_name == "United States Geological Survey (USGS)": + tr.stats.is_gin = "is_gin" + # TODO: Maybe add additional optional fields as seen in IAGA2002Writer? + stream.merge() + return stream + + +def write_iaga_file(stream: Stream, station: str, channels: list, out_directory: Path): + """splits stream into days and write .min iaga files. + + Parameters + ---------- + stream: + ObsPy stream to split. + station: + Name of observatory station. + channels: + List of channels to write. + out_directory: + Optional directory to save files to. + + Returns + ------- + iaga files in optional directory. + """ + # get the start time of the first trace in the stream + start_time = stream[0].stats.starttime + end_of_month = (stream[0].stats.endtime).day + + # loop over each day in the month and split the stream into day-long streams + for day in range(1, end_of_month + 1): + day_start = UTCDateTime(start_time.year, start_time.month, day) + day_end = UTCDateTime( + start_time.year, start_time.month, day, hour=23, minute=59 + ) + day_stream = stream.slice(day_start, day_end) + + # create file object in optional directory + date = ((str(day_start)).split("T"))[0] + date = "".join((date.split("-"))[0:3]) + name = str(station.lower()) + date + "qmin.min" + if out_directory is not None: + filepath = os.path.join(out_directory, name) + fh = open(filepath, "wb") + else: + fh = open(name, "wb") + + # TODO: Add Edge and/or Intermagnet as upload options? + # write file using factory, can use sys.stdout + IAGA2002Factory().write_file(fh=fh, timeseries=day_stream, channels=channels) + fh.close() + + +def main() -> None: + """Entrypoint for make iaga method.""" + # for one command, can use typer.run + typer.run(make_iaga) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index be8965ca480e9a633d710c054e101b87b1b89f6f..0ce4776fba1e001714c2a3ef37b6db3a7569a1ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,6 +77,7 @@ make-cal = "geomagio.processing.make_cal:main" geomag-filter = "geomagio.processing.filters:main" copy-absolutes = "geomagio.processing.copy_absolutes:main" copy-observatory = "geomagio.processing.copy_observatory:main" +make-iaga = "geomagio.processing.make_iaga:main" [tool.poe.tasks] # e.g. "poetry run poe lint"