Skip to content
Snippets Groups Projects
Commit f4c3996f authored by Clayton, Brandon Scott's avatar Clayton, Brandon Scott
Browse files

cleanup

parent ef4b07db
No related branches found
No related tags found
1 merge request!53Convert to NetCDF
Showing
with 535 additions and 459 deletions
......@@ -19,12 +19,12 @@ flake8 = "^3.8.3"
isort = "5.6.4"
[tool.poetry.scripts]
database = "src.main.python.poetry_scripts:database"
format = "src.main.python.poetry_scripts:format"
format_check = "src.main.python.poetry_scripts:format_check"
githooks = "src.main.python.poetry_scripts:githooks"
help = "src.main.python.poetry_scripts:help"
lint = "src.main.python.poetry_scripts:lint"
netcdf = "src.main.python.gov.usgs.earthquake.nshmp.netcdf.__main__:run"
pre_push = "src.main.python.poetry_scripts:pre_push"
[tool.isort]
......
......@@ -7,36 +7,44 @@ Supported NSHMs for download and conversion:
- NSHM_2018A
"""
from timeit import default_timer
from rich.traceback import install
from .console import console
from .application_inputs import ApplicationInputs
from .converters.convert_2018a import Convert2018A
from .converters.preprocess import Preprocess
from .database.database import Database
from .database.database_input import DatabaseInput
from .parsers.args_parser import parser
from .utils.console import console
def run():
"""Run database and conversion
Download from ScienceBase and convert to NetCDF
"""
if __name__ == "__main__":
# Get rich stack trace output
install()
try:
database_input = DatabaseInput.from_args(args=vars(parser.parse_args()))
show_help = database_input.show_help()
inputs = ApplicationInputs.from_args(args=vars(parser.parse_args()))
show_help = inputs.show_help()
if show_help is True:
parser.print_help()
else:
database = Database(db_input=database_input)
if not database_input.download_only:
netcdf_metadata = Preprocess.preprocess(database=database)
console.print("Preprocessing done", style="bold yellow")
database = Database(inputs=inputs)
for metadata in netcdf_metadata:
Convert2018A(metadata=metadata)
if not inputs.download_only:
for database_info in database.database_info:
preprocess = Preprocess(database_info=database_info)
console.print("Preprocessing done", style="bold yellow")
console.print("\nConversion done", style="bold yellow")
start = default_timer()
Convert2018A(inputs=inputs, metadata=preprocess.netcdf_metadata)
end = default_timer()
print(f"Time {( end - start )}")
except Exception as error:
raise error
......@@ -2,7 +2,7 @@ import os
from pathlib import Path
from ..nshm import Nshm
from .nshm import Nshm
_ROOT_DIR = os.getcwd()
......@@ -10,7 +10,7 @@ _DEFAULT_DB_DIR = os.path.join(_ROOT_DIR, "libs", "NETCDF_DATA")
_NSHM = "NSHM"
class DatabaseInput:
class ApplicationInputs:
"""Database Input
Inputs to which database to download and convert to NetCDF
......@@ -20,6 +20,7 @@ class DatabaseInput:
# Set defaults
self.all = False
self.clean = False
self.clean_ascii = False
self.db_dir = Path(_DEFAULT_DB_DIR)
self.download_only = False
self.__setattr__(Nshm.NSHM_2008.name, False)
......@@ -45,26 +46,26 @@ class DatabaseInput:
return True if has_nshm is False and self.all is False and self.clean is False else False
@staticmethod
def from_args(args=dict) -> "DatabaseInput":
def from_args(args=dict) -> "ApplicationInputs":
"""Convert Arguments
Converts command line arguments to database input
"""
database_input = DatabaseInput()
inputs = ApplicationInputs()
env = os.environ
if _NSHM in env and any(nshm for nshm in Nshm if nshm.name == env[_NSHM]):
args[env[_NSHM]] = True
for key, value in args.items():
if hasattr(database_input, key):
if hasattr(inputs, key):
if key == "db_dir":
setattr(database_input, key, DatabaseInput._check_db_dir(db_dir=value))
setattr(inputs, key, ApplicationInputs._check_db_dir(db_dir=value))
else:
setattr(database_input, key, value)
setattr(inputs, key, value)
return database_input
return inputs
@staticmethod
def _check_db_dir(db_dir: str) -> Path:
......
import os
import shutil
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import netCDF4 as netcdf
import numpy as np
from concurrent.futures import Future, ThreadPoolExecutor
from ..console import console
from ..database.database_info import NetcdfMetadata, SiteClassDirectories, ScienceBaseMetadata
from ..application_inputs import ApplicationInputs
from ..database.database_info import NetcdfInfo, NetcdfMetadata, ScienceBaseMetadata
from ..gmm.imt import Imt
from ..gmm.site_class import SiteClass
from ..utils.console import console
from ..utils.netcdf_dimensions import NetcdfDimensions
from ..utils.netcdf_keys import NetcdfKeys
from ..utils.netcdf_parameters import NetcdfParameters
......@@ -28,46 +32,97 @@ _ROOT_ATTRIBUTES: dict = {
class Convert2018A:
def __init__(self, metadata: NetcdfMetadata):
def __init__(self, inputs: ApplicationInputs, metadata: NetcdfMetadata):
self.metadata = metadata
self.inputs = inputs
hazard_parameters = NetcdfParameters().HazardParameters()
nshm = self.metadata.database_info.nshm
region = self.metadata.model_region
console.print(f"\n[blue]Converting {nshm.label}")
self._imt_indices: dict[Imt, int] = self._set_imt_indices()
self._site_class_indices: dict[SiteClass, int] = self._set_site_class_indices()
self._netcdf_filename = (
self.metadata.database_info.data_path.joinpath(
f"{self.metadata.database_info.nshm.value}_{self.metadata.model_region.name}.nc"
self._netcdf_filename = Path(
metadata.database_info.database_directory.joinpath(
f"{metadata.database_info.nshm.value}_{metadata.model_region.name}.nc"
)
)
self._check_file()
self._root_group = netcdf.Dataset(
filename=self._netcdf_filename, mode="w", format="NETCDF4", clobber=True
)
self._dataset_group: netcdf.Group = self._root_group.createGroup(
f"{nshm.value}/{region.name}"
)
self._dimensions = self._create_dimensions(group=self._dataset_group)
self._site_class_mask_array = np.zeros(
[self._dimensions.lat.size, self._dimensions.lon.size], int
)
self._imt_mask_array = np.zeros([self._dimensions.lat.size, self._dimensions.lon.size], int)
self._site_class_data_array = np.full(
[
self._dimensions.imt.size,
self._dimensions.lat.size,
self._dimensions.lon.size,
self._dimensions.iml.size,
],
hazard_parameters.fill_value,
float,
)
self._write_netcdf_file()
self._root_group.close()
self._clean_ascii()
console.print(
(
f"NetCDF conversion for NSHM ({metadata.database_info.nshm.value}) completed\n"
+ f"NetCDF file: {self._netcdf_filename}"
),
style="blue",
)
def _create_netcdf_variables(self, group: netcdf.Group, dimensions: netcdf.Dimension):
iml_values, imt_values = self._set_imt_values(dimensions=dimensions)
imt_enum_type = self._imt_enum_type(group=group)
@property
def netcdf_file(self):
return self._netcdf_filename
def _check_file(self):
if self._netcdf_filename.exists():
os.remove(self._netcdf_filename)
def _create_netcdf_variables(self):
iml_values, imt_values = self._set_imt_values()
imt_enum_type = self._imt_enum_type()
grid_step = self.metadata.grid_step
latitudes = self.metadata.locations.latitudes
longitudes = self.metadata.locations.longitudes
site_class_values = self._set_site_class_values()
site_class_enum_type = self._site_class_enum_type(group=group)
site_class_enum_type = self._site_class_enum_type()
parameters = NetcdfParameters()
grid_mask_netcdf_var = parameters.GridMaskParameters().netcdf_variable(group=group)
hazard_netcdf_var = parameters.HazardParameters().netcdf_variable(group=group)
parameters.ImlParameters().netcdf_variable(group=group, data=iml_values)
grid_mask_netcdf_var = parameters.GridMaskParameters().netcdf_variable(
group=self._dataset_group
)
hazard_netcdf_var = parameters.HazardParameters().netcdf_variable(group=self._dataset_group)
parameters.ImlParameters().netcdf_variable(group=self._dataset_group, data=iml_values)
parameters.ImtParameters(datatype=imt_enum_type).netcdf_variable(
group=group, data=imt_values
group=self._dataset_group, data=imt_values
)
NetcdfUtils.create_coordinate_latitude(
group=self._dataset_group, grid_step=grid_step, values=latitudes
)
NetcdfUtils.create_coordinate_longitude(
group=self._dataset_group, grid_step=grid_step, values=longitudes
)
NetcdfUtils.create_coordinate_latitude(group=group, grid_step=grid_step, values=latitudes)
NetcdfUtils.create_coordinate_longitude(group=group, grid_step=grid_step, values=longitudes)
parameters.SiteClassParameters(datatype=site_class_enum_type).netcdf_variable(
group=group, data=site_class_values
group=self._dataset_group, data=site_class_values
)
parameters.Vs30Parameters().netcdf_variable(
group=self._dataset_group, data=self.metadata.vs30s
)
parameters.Vs30Parameters().netcdf_variable(group=group, data=self.metadata.vs30s)
return grid_mask_netcdf_var, hazard_netcdf_var
......@@ -90,67 +145,36 @@ class Convert2018A:
return dimensions
def _imt_enum_type(self, group: netcdf.Group) -> netcdf.EnumType:
imt_dict = dict()
for index, imt in enumerate(Imt):
imt_dict.setdefault(imt.name, index)
return group.createEnumType(
datatype=np.uint8, datatype_name=NetcdfKeys.IMT_ENUM_TYPE, enum_dict=imt_dict
)
def _clean_ascii(self) -> None:
if self.inputs.clean_ascii is True:
path = self.metadata.database_info.data_path
console.print(f"\n Removing ASCII files in ({path})", style="yellow")
shutil.rmtree(path)
def _get_hazard_data(
self,
dimensions: NetcdfDimensions,
executor: ThreadPoolExecutor,
hazard_netcdf_var: netcdf.Variable,
latitude_indices: dict[float, int],
longitude_indices: dict[float, int],
site_class_dirs: SiteClassDirectories,
netcdf_info: list[NetcdfInfo],
):
hazard_parameters = NetcdfParameters().HazardParameters()
site_class_mask_array = np.zeros([dimensions.lat.size, dimensions.lon.size], int)
site_class = site_class_dirs.site_class
site_class_data_array = np.full(
[
dimensions.imt.size,
dimensions.lat.size,
dimensions.lon.size,
dimensions.iml.size,
],
hazard_parameters.fill_value,
float,
)
futures: list[Future[np.ndarray]] = []
for _imt, _imt_dir in site_class_dirs.imt_directories.items():
imt: Imt = _imt
imt_dir: Path = _imt_dir
future = executor.submit(
self._read_curves_file,
imt=imt,
imt_dir=imt_dir,
latitude_indices=latitude_indices,
longitude_indices=longitude_indices,
site_class_data_array=site_class_data_array,
dimensions=dimensions
)
futures.append(future)
for _future in futures:
future: Future[np.ndarray] = _future
mask_array = future.result(timeout=600)
site_class_mask_array += mask_array
# Site class data
hazard_netcdf_var[
self._get_site_class_index(site_class=site_class), :, :, :, :
] = site_class_data_array
futures: list[Future] = []
with ThreadPoolExecutor() as executor:
for index, _info in enumerate(netcdf_info):
info: NetcdfInfo = _info
futures.append(
executor.submit(
self._read_curves_file,
hazard_netcdf_var=hazard_netcdf_var,
latitude_indices=latitude_indices,
longitude_indices=longitude_indices,
netcdf_info=info,
)
)
return site_class_mask_array
for future in futures:
future.result(timeout=120)
def _get_imt_index(self, imt: Imt):
return self._imt_indices.get(imt)
......@@ -172,26 +196,33 @@ class Convert2018A:
def _get_site_class_index(self, site_class: SiteClass):
return self._site_class_indices.get(site_class)
def _imt_enum_type(self) -> netcdf.EnumType:
imt_dict: dict[str, int] = {}
for index, imt in enumerate(Imt):
imt_dict.setdefault(imt.name, index)
return self._dataset_group.createEnumType(
datatype=np.uint8, datatype_name=NetcdfKeys.IMT_ENUM_TYPE, enum_dict=imt_dict
)
def _read_curves_file(
self,
imt: Imt,
imt_dir: Path,
netcdf_info: NetcdfInfo,
hazard_netcdf_var: netcdf.Variable,
latitude_indices: dict[float, int],
longitude_indices: dict[float, int],
site_class_data_array: np.ndarray,
dimensions=NetcdfDimensions
):
site_class_mask_array = np.zeros([dimensions.lat.size, dimensions.lon.size], int)
imt_mask_array = np.zeros([dimensions.lat.size, dimensions.lon.size], int)
curves_file = imt_dir.joinpath(NetcdfKeys.CURVES_FILE)
curves_file = netcdf_info.curve_file
if not curves_file.exists:
raise Exception(f"File ({curves_file}) not found")
imt_dir = curves_file.parent
print(f"\t Converting [{imt_dir.parent.name}/{imt_dir.name}/{curves_file.name}]")
with open(curves_file, "r") as curves_reader:
imls = self.metadata.imls.get(imt)
imls = self.metadata.imls.get(netcdf_info.imt)
# Skip header
next(curves_reader)
......@@ -219,14 +250,16 @@ class Convert2018A:
longitude, self._get_longitude_index(longitude=longitude)
)
site_class_data_array[
self._get_imt_index(imt=imt), latitude_index, longitude_index, :
self._site_class_data_array[
self._get_imt_index(imt=netcdf_info.imt), latitude_index, longitude_index, :
] = values
imt_mask_array[latitude_index, longitude_index] = 1
self._imt_mask_array[latitude_index, longitude_index] = 1
site_class_mask_array += imt_mask_array
return site_class_mask_array
self._site_class_mask_array += self._imt_mask_array
hazard_netcdf_var[
self._get_site_class_index(site_class=netcdf_info.site_class), :, :, :, :
] = [longitude, latitude, *self._site_class_data_array]
def _set_imt_indices(self) -> dict[Imt, int]:
imt_indices: dict[Imt, int] = dict()
......@@ -241,9 +274,9 @@ class Convert2018A:
return imt_indices
def _set_imt_values(self, dimensions: NetcdfDimensions):
def _set_imt_values(self):
imt_values: list[int] = []
iml_values = np.full([dimensions.imt.size, dimensions.iml.size], 0, float)
iml_values = np.full([self._dimensions.imt.size, self._dimensions.iml.size], 0, float)
data_index = 0
for index, _imt in enumerate(Imt):
......@@ -269,7 +302,7 @@ class Convert2018A:
return indices
def _set_site_class_values(self):
def _set_site_class_values(self) -> list[int]:
values: list[int] = []
for index, _site_class in enumerate(SiteClass):
......@@ -280,64 +313,43 @@ class Convert2018A:
return values
def _site_class_enum_type(self, group: netcdf.Group) -> netcdf.EnumType:
def _site_class_enum_type(self) -> netcdf.EnumType:
site_class_dict: dict[str, int] = {}
for index, site_class in enumerate(SiteClass):
site_class_dict.setdefault(site_class.name, index)
return group.createEnumType(np.uint8, NetcdfKeys.SITE_CLASS_ENUM_TYPE, site_class_dict)
return self._dataset_group.createEnumType(
datatype=np.uint8,
datatype_name=NetcdfKeys.SITE_CLASS_ENUM_TYPE,
enum_dict=site_class_dict,
)
def _write_hazard_data(
self, group: netcdf.Group, hazard_netcdf_var: netcdf.Variable, dimensions: NetcdfDimensions
):
total_mask_array = np.zeros([dimensions.lat.size, dimensions.lon.size], int)
def _write_hazard_data(self, hazard_netcdf_var: netcdf.Variable):
latitude_indices: dict[float, int] = {}
longitude_indices: dict[float, int] = {}
status_msg = f"[bold green]Converting {self.metadata.database_info.nshm.label} files ..."
with console.status(status_msg, spinner="pong") as status:
with ThreadPoolExecutor() as executor:
for _site_class_dirs in self.metadata.directories:
site_class_dirs: SiteClassDirectories = _site_class_dirs
site_class_mask_array = self._get_hazard_data(
dimensions=dimensions,
executor=executor,
hazard_netcdf_var=hazard_netcdf_var,
latitude_indices=latitude_indices,
longitude_indices=longitude_indices,
site_class_dirs=site_class_dirs
)
total_mask_array += site_class_mask_array
print()
self._get_hazard_data(
netcdf_info=self.metadata.netcdf_info,
hazard_netcdf_var=hazard_netcdf_var,
latitude_indices=latitude_indices,
longitude_indices=longitude_indices,
)
status.stop()
return total_mask_array
def _write_netcdf_file(self):
self._root_group.setncatts(_ROOT_ATTRIBUTES)
nshm = self.metadata.database_info.nshm
region = self.metadata.model_region
console.print(f"\n[blue]Converting {nshm.label}")
dataset_group: netcdf.Group = self._root_group.createGroup(f"{nshm.value}/{region.name}")
dataset_group.description = self.metadata.database_info.description
self._dataset_group.description = self.metadata.database_info.description
for index, _metadata in enumerate(self.metadata.database_info.science_base_metadata):
metadata: ScienceBaseMetadata = _metadata
dataset_group.setncattr(name=f"science_base_url_{index}", value=metadata.url)
dataset_group.setncattr(
name=f"science_base_version_{index}",
value=metadata.science_base_version
self._dataset_group.setncattr(name=f"science_base_url_{index}", value=metadata.url)
self._dataset_group.setncattr(
name=f"science_base_version_{index}", value=metadata.science_base_version
)
dimensions = self._create_dimensions(group=dataset_group)
grid_mask_netcdf_var, hazard_netcdf_var = self._create_netcdf_variables(
group=dataset_group,
dimensions=dimensions
)
mask_array_total = self._write_hazard_data(
group=dataset_group, hazard_netcdf_var=hazard_netcdf_var, dimensions=dimensions
)
grid_mask_netcdf_var[:, :] = mask_array_total
grid_mask_netcdf_var, hazard_netcdf_var = self._create_netcdf_variables()
self._write_hazard_data(hazard_netcdf_var=hazard_netcdf_var)
grid_mask_netcdf_var[:, :] = self._site_class_mask_array
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from ..console import console
from ..database.database import Database
from ..database.database_info import DatabaseInfo, DataInfo, NetcdfMetadata
import numpy as np
from ..database.database_info import DatabaseInfo, DataInfo, NetcdfInfo, NetcdfMetadata
from ..geo.location import Locations
from ..geo.region import Region
from ..gmm.imt import Imt
from ..gmm.site_class import SiteClass
from ..nshm import Nshm
from ..parsers.data_file_parser import DataFileParser
from ..parsers.data_path_parser import DataPathParser
from ..utils.console import console
import numpy as np
class Preprocess(tuple[NetcdfMetadata]):
class Preprocess:
"""Preprocess Data Files
Preprocess data file for each NSHM in database and save metdata
for NetCDF conversion
"""
@staticmethod
def preprocess(database: Database) -> "Preprocess":
"""Preprocess ASCII data
Preprocess ASCII data for NetCDF metadata for each NSHM.
Args:
database: The database
Returns:
A list of NetCDF metadata for each NSHM
"""
netcdf_metadata: list[NetcdfMetadata] = []
for _data in database.database_info:
database_info: DatabaseInfo = _data
data_info_list: list[DataInfo] = []
console.print(f"Preprocessing {database_info.nshm.label}", style="blue")
status_msg = f"[bold green]Parsing {database_info.nshm.label} files ..."
with console.status(status_msg, spinner="pong") as status:
for _data_dir in database_info.data_dirs:
data_dir: Path = _data_dir
print()
data_info_list.append(
DataPathParser.parse_path(database_info=database_info, data_dir=data_dir)
def __init__(self, database_info: DatabaseInfo):
self._metadata = self._preprocess(database_info=database_info)
@property
def netcdf_metadata(self):
return self._metadata
def _preprocess(self, database_info: DatabaseInfo) -> NetcdfMetadata:
console.print(f"Preprocessing {database_info.nshm.label}", style="blue")
status_msg = f"[bold green]Parsing {database_info.nshm.label} files ..."
futures: list[Future[DataInfo]] = []
with console.status(status_msg, spinner="pong") as status:
with ThreadPoolExecutor() as executor:
for _curve_file in database_info.curve_files:
curve_file: Path = _curve_file
future = executor.submit(
DataPathParser.parse_path,
database_info=database_info,
curve_file=curve_file,
)
status.stop()
netcdf_metadata.append(
Preprocess._condense_data_info(
database_info=database_info, data_info_list=data_info_list
)
)
return Preprocess(tuple(netcdf_metadata))
@staticmethod
def _check_grid_step(database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> float:
futures.append(future)
status.stop()
data_info_list: list[DataInfo] = []
for _future in futures:
future: Future[DataInfo] = _future
data_info_list.append(future.result(timeout=30))
# with console.status(status_msg, spinner="pong") as status:
# for _data_dir in database_info.data_directories:
# data_dir: Path = _data_dir
# data_info = DataPathParser.parse_path(
# database_info=database_info, data_dir=data_dir
# )
# data_info_list.append(data_info)
# status.stop()
return self._condense_data_info(database_info=database_info, data_info_list=data_info_list)
def _check_grid_step(
self, database_info: DatabaseInfo, data_info_list: list[DataInfo]
) -> float:
"""Check region
Check that each parsed region is the same.
......@@ -76,72 +79,7 @@ class Preprocess(tuple[NetcdfMetadata]):
)
return grid_steps.pop()
@staticmethod
def _check_imls(
database_info: DatabaseInfo, imls: list[dict[Imt, list[float]]]
) -> dict[Imt, list[float]]:
"""Check IMLs
Check that each parsed IML list is the same.
"""
for _prev in imls:
for _curr in imls:
prev = dict(sorted(_prev.items(), key=lambda item: item[0].value))
curr = dict(sorted(_curr.items(), key=lambda item: item[0].value))
if prev != curr:
raise Exception(
f"""
Imls do not match for NSHM [{database_info.nshm.value}]
Path: {database_info.data_path}
"""
)
imls_dict: dict = imls.pop()
return dict(sorted(imls_dict.items(), key=lambda item: item[0].value))
@staticmethod
def _check_imts(database_info: DatabaseInfo, imts: list[list[Imt]]) -> list[Imt]:
"""Check IMTs
Check that each parsed IMT list is the same.
"""
for prev in imts:
for curr in imts:
prev = sorted(prev, key=lambda imt: imt.value)
curr = sorted(curr, key=lambda imt: imt.value)
if prev != curr:
raise Exception(
f"""
Imts do not match for NSHM [{database_info.nshm.value}]
Path: {database_info.data_path}
"""
)
return sorted(imts.pop(), key=lambda imt: imt.value)
@staticmethod
def _check_locations(database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> Locations:
"""Check Locations
Check that each latitude and longitude are the same for each imt.
"""
locations = [info.locations for info in data_info_list]
for prev in locations:
for curr in locations:
if prev != curr:
raise ValueError(
f"""
Locations do not match for NSHM [{database_info.nshm.value}]
Path: {database_info.data_path}
"""
)
return locations.pop()
@staticmethod
def _check_region(database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> Region:
def _check_region(self, database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> Region:
"""Check region
Check that each parsed region is the same.
......@@ -157,8 +95,7 @@ class Preprocess(tuple[NetcdfMetadata]):
)
return regions.pop()
@staticmethod
def _check_year(database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> int:
def _check_year(self, database_info: DatabaseInfo, data_info_list: list[DataInfo]) -> int:
"""Check year
Check that each parsed year is the same
......@@ -175,40 +112,108 @@ class Preprocess(tuple[NetcdfMetadata]):
return years.pop()
@staticmethod
def _condense_data_info(
database_info: DatabaseInfo, data_info_list: list[DataInfo]
self, database_info: DatabaseInfo, data_info_list: list[DataInfo]
) -> NetcdfMetadata:
"""Condense Data Info
Condense the list of data info into NetCDF metadata
"""
imts = [info.imts for info in data_info_list]
imls = [info.imls for info in data_info_list]
locations = Preprocess._check_locations(
database_info=database_info,
data_info_list=data_info_list
netcdf_info: list[NetcdfInfo] = list(
map(
lambda info: NetcdfInfo(
curve_file=info.curve_file, imt=info.imt, site_class=info.site_class
),
data_info_list,
)
)
directories = [info.directories for info in data_info_list]
locations = self._reduce_locations(data_info_list=data_info_list)
site_class_set = set([info.site_class for info in data_info_list])
site_classes = list(sorted(site_class_set, key=lambda site: site.name))
netcdf_data_info = NetcdfMetadata(
database_info=database_info,
directories=tuple(directories),
grid_step=Preprocess._check_grid_step(
grid_step=self._check_grid_step(
database_info=database_info, data_info_list=data_info_list
),
imls=Preprocess._check_imls(database_info=database_info, imls=imls),
imts=Preprocess._check_imts(database_info=database_info, imts=imts),
imls=self._reduce_imls(database_info=database_info, data_info_list=data_info_list),
imts=self._reduce_imts(data_info_list=data_info_list),
locations=locations,
model_region=Preprocess._check_region(
model_region=self._check_region(
database_info=database_info, data_info_list=data_info_list
),
site_classes=[info.site_class for info in data_info_list],
vs30s=sorted([info.vs30 for info in data_info_list]),
year=Preprocess._check_year(database_info=database_info, data_info_list=data_info_list),
netcdf_info=netcdf_info,
site_classes=site_classes,
vs30s=tuple(sorted(map(lambda site: site.value, site_classes))),
year=self._check_year(database_info=database_info, data_info_list=data_info_list),
)
return netcdf_data_info
# def _reduce_directories(self, data_info_list: list[DataInfo]) -> list[SiteClassDirectories]:
# directories: list[SiteClassDirectories] = [info.directories for info in data_info_list]
# directories_dict: dict[SiteClass, dict[Imt, Path]] = {}
# for _dirs in directories:
# dirs: SiteClassDirectories = _dirs
# imt_dirs: dict[Imt, Path] = directories_dict.get(dirs.site_class, {})
# imt_dirs = {**imt_dirs, **dirs.imt_directories}
# directories_dict.setdefault(dirs.site_class, imt_dirs)
# directories_dict = dict(sorted(directories_dict.items(), key=lambda item: item[0].name))
# site_class_dirs: list[SiteClassDirectories] = []
# for site_class, dirs in directories_dict.items():
# site_class_dirs.append(
# SiteClassDirectories(site_class=site_class, imt_directories=dirs)
# )
# return site_class_dirs
def _reduce_imls(
self, database_info: DatabaseInfo, data_info_list: list[DataInfo]
) -> dict[Imt, list[float]]:
"""Reduce and Check IMLs
Check that each parsed IML list is the same for each IMT.
"""
imls: dict[Imt, list[float]] = {info.imt: info.imls for info in data_info_list}
for _prev in data_info_list:
prev: DataInfo = _prev
for _curr in data_info_list:
curr: DataInfo = _curr
if prev.imt == curr.imt and prev.imls != curr.imls:
raise Exception(
f"""
Imls do not match for NSHM [{database_info.nshm.value}]
Path: {database_info.data_path}
"""
)
return dict(sorted(imls.items(), key=lambda item: item[0].value))
def _reduce_imts(self, data_info_list: list[DataInfo]) -> list[Imt]:
"""Reduce IMTs
Return list of IMTs
"""
imts = [info.imt for info in data_info_list]
imt_set = set(imts)
return sorted(list(imt_set), key=lambda imt: imt.value)
def _reduce_locations(self, data_info_list: list[DataInfo]) -> Locations:
"""Reduce Locations
Returns locations that have data
"""
latitudes_list = tuple(map(lambda info: info.locations.latitudes, data_info_list))
latitudes = tuple(sorted(set(sum(latitudes_list, ()))))
longitudes_list = tuple(map(lambda info: info.locations.longitudes, data_info_list))
longitudes = tuple(sorted(set(sum(longitudes_list, ()))))
return Locations(latitudes=latitudes, longitudes=longitudes)
......@@ -11,10 +11,10 @@ import yaml
from sciencebasepy import SbSession
from ..console import console
from ..application_inputs import ApplicationInputs
from ..nshm import Nshm
from ..utils.console import console
from .database_info import DatabaseInfo, ScienceBaseMetadata
from .database_input import DatabaseInput
_ASCII_DIR = "ASCII"
......@@ -29,11 +29,11 @@ class Database:
Download database defined in resources/catalog.yml.
Args:
db_input: The database inputs
inputs: The database inputs
"""
def __init__(self, db_input: DatabaseInput):
self.db_input = db_input
def __init__(self, inputs: ApplicationInputs):
self.inputs = inputs
self._session: SbSession
self._database_info = self._database()
......@@ -49,24 +49,19 @@ class Database:
try:
self._session = sciencebasepy.SbSession()
database_info: list[DatabaseInfo] = []
self.db_input.db_dir = self._check_db_info()
self.inputs.db_dir = self._check_db_info()
if self.db_input.clean is True:
if self.inputs.clean is True:
self._clean()
else:
catalog_info = self._get_catalog_info()
self._check_db_dir()
database_info = self._get_catalogs(catalogs=catalog_info["catalogs"])
msg = f"\nDatabases downloaded succesfully and are located in ({self.db_input.db_dir}) directory.\n"
msg = f"\nDatabases downloaded succesfully and are located in ({self.inputs.db_dir}) directory.\n"
console.print(msg, style="blue")
self._session.logout()
# Update database info with data directories
for _info in database_info:
info: DatabaseInfo = _info
info.set_data_directories()
return tuple(database_info)
except Exception as error:
raise RuntimeError(f"Failed to download database: \n {error}")
......@@ -78,7 +73,7 @@ class Database:
Create directory if not present
"""
db_dir = self.db_input.db_dir
db_dir = self.inputs.db_dir
if not db_dir.exists():
os.makedirs(db_dir)
......@@ -94,7 +89,7 @@ class Database:
not present from the argument.
"""
db_dir = self.db_input.db_dir
db_dir = self.inputs.db_dir
if os.getenv(_DB_PATH_ENV):
db_dir = Path(os.getenv(_DB_PATH_ENV))
......@@ -102,7 +97,7 @@ class Database:
db_info = self._read_db_info()
db_dir = Path(db_info["database_path"])
if self.db_input.clean is True:
if self.inputs.clean is True:
return db_dir
if _DB_INFO_FILE.exists() is not True:
......@@ -121,14 +116,14 @@ class Database:
Check to see if NSHM specific directory exists, if not create
"""
nshm_path: Path = self.db_input.db_dir.joinpath(_ASCII_DIR, nshm.value)
nshm_path: Path = self.inputs.db_dir.joinpath(_ASCII_DIR, nshm.value)
if nshm_path.exists() is False:
os.mkdir(nshm_path)
return nshm_path
def _check_zip(self, target: Path, dest: Optional[Path] = None) -> None:
def _check_zip(self, target: Path, dest: Optional[Path] = None, remove_target=True) -> None:
"""Check Zip
Check if any files are zipped and extract
......@@ -137,9 +132,9 @@ class Database:
if target.is_dir():
for _entry in os.scandir(target):
entry: os.DirEntry = _entry
self._check_zip(target=Path(entry.path))
self._check_zip(target=Path(entry.path), remove_target=remove_target)
else:
self._extract_zip(target=target, dest=dest)
self._extract_zip(target=target, dest=dest, remove_target=remove_target)
def _clean(self) -> None:
"""Clean database
......@@ -147,11 +142,11 @@ class Database:
Remove the database files and database info file.
"""
if self.db_input.db_dir.exists():
if self.inputs.db_dir.exists():
console.print(
f"\nRemoving databases in ({self.db_input.db_dir}) directory\n", style="yellow"
f"\nRemoving databases in ({self.inputs.db_dir}) directory\n", style="yellow"
)
shutil.rmtree(self.db_input.db_dir)
shutil.rmtree(self.inputs.db_dir)
if _DB_INFO_FILE.exists():
console.print(f"Removing database info file ({_DB_INFO_FILE})\n", style="yellow")
......@@ -175,11 +170,16 @@ class Database:
"""
for item in items:
response = self._session.get_item(itemid=item["id"])
database_files = self._session.get_item_file_info(item=response)
self._get_files(nshm=nshm, nshm_dir=nshm_dir, item=item, database_files=database_files)
if item["id"] == "LOCAL":
self._get_local_files(nshm=nshm, nshm_dir=nshm_dir, item=item)
else:
response = self._session.get_item(itemid=item["id"])
database_files = self._session.get_item_file_info(item=response)
self._get_files(
nshm=nshm, nshm_dir=nshm_dir, item=item, database_files=database_files
)
def _extract_zip(self, target: Path, dest: Optional[Path] = None) -> None:
def _extract_zip(self, target: Path, dest: Optional[Path] = None, remove_target=True) -> None:
"""Extract ZIP
Extract ZIP file and call back to _check_zip
......@@ -190,7 +190,8 @@ class Database:
print(f"Extracting [{target.name}] ...")
with ZipFile(target, "r") as zip_in:
zip_in.extractall(path_out)
os.remove(target)
if remove_target is True:
os.remove(target)
self._clean_nshm_dir(path=path_out)
self._check_zip(target=path_out)
......@@ -219,22 +220,26 @@ class Database:
description = catalog["description"]
nshm = Nshm.from_string(nshm_str=nshm_str)
if self.db_input.has_nshm(nshm=nshm) is True or self.db_input.all is True:
if self.inputs.has_nshm(nshm=nshm) is True or self.inputs.all is True:
metadata: list[ScienceBaseMetadata] = []
for item in items:
metadata.append(ScienceBaseMetadata(
science_base_version=item["science_base_version"],
url=item["url"]
))
metadata.append(
ScienceBaseMetadata(
science_base_version=item["science_base_version"], url=item["url"]
)
)
nshm_dir = self._check_nshm_dir(nshm=nshm)
database_info.append(DatabaseInfo(
description=description,
data_path=nshm_dir,
nshm=nshm,
science_base_metadata=metadata
))
database_info.append(
DatabaseInfo(
description=description,
data_path=nshm_dir,
database_directory=self.inputs.db_dir,
nshm=nshm,
science_base_metadata=metadata,
)
)
self._download_files(nshm=nshm, nshm_dir=nshm_dir, items=items)
return database_info
......@@ -250,7 +255,7 @@ class Database:
self._session.download_file(url=url, local_filename=nshm_file, destination=nshm_dir)
self._check_zip(target=path)
else:
print(f"File [{Path(str(path)[0:-4]).name}] already exists, skipping")
print(f"File ({Path(str(path)[0:-4]).name}) already exists, skipping")
def _get_files(self, nshm: Nshm, nshm_dir: Path, item: dict, database_files: list) -> None:
"""Get files
......@@ -262,11 +267,11 @@ class Database:
status_msg = f"[bold green]Downloading and Extracting {nshm.label} ..."
with console.status(status_msg, spinner="pong") as status:
with ThreadPoolExecutor() as executer:
with ThreadPoolExecutor() as executor:
if item["files"] == "ALL":
for nshm_file in database_files:
futures.append(
executer.submit(
executor.submit(
self._get_file,
nshm=nshm,
nshm_file=Path(nshm_file["name"]),
......@@ -284,7 +289,7 @@ class Database:
msg = f"could not find database file [{nshm_file}] for item [{id}]"
raise Exception(msg)
futures.append(
executer.submit(
executor.submit(
self._get_file,
nshm=nshm,
nshm_file=Path(nshm_file),
......@@ -298,6 +303,21 @@ class Database:
future: Future = _future
future.result(timeout=300)
def _get_local_files(self, nshm: Nshm, nshm_dir: Path, item: dict):
status_msg = f"[bold green]Downloading and Extracting {nshm.label} ..."
with console.status(status_msg, spinner="pong") as status:
for nshm_file in item["files"]:
target = Path(os.path.join(nshm_file)).absolute()
dest = nshm_dir.joinpath(target.name.split(".")[0])
if dest.exists() is False:
self._check_zip(target=target, dest=dest, remove_target=False)
else:
print(f"File ({dest.name}) already exists, skipping")
status.stop()
def _read_db_info(self) -> dict:
"""Read in database info
......@@ -317,13 +337,13 @@ class Database:
Write YAML file with database info
"""
db_info = {"database_path": str(self.db_input.db_dir)}
db_info = {"database_path": str(self.inputs.db_dir)}
try:
console.print(f"Writing database info to ({_DB_INFO_FILE})", style="green")
with open(_DB_INFO_FILE, "w+") as fout:
yaml.dump(db_info, fout)
return self.db_input.db_dir
return self.inputs.db_dir
except Exception:
msg = f"Could not write database info to ({_DB_INFO_FILE})"
raise Exception(msg)
......@@ -3,11 +3,12 @@ import os
from dataclasses import dataclass, field
from pathlib import Path
from ..geo.location import Locations, Location
from ..geo.location import Location, Locations
from ..geo.region import Region
from ..gmm.imt import Imt
from ..gmm.site_class import SiteClass
from ..nshm import Nshm
from ..utils.netcdf_keys import NetcdfKeys
@dataclass
......@@ -16,13 +17,6 @@ class ScienceBaseMetadata:
url: str
@dataclass
class SiteClassDirectories:
site_class: SiteClass
directory: Path
imt_directories: dict[Imt, Path]
@dataclass
class DatabaseInfo:
"""Database Info
......@@ -33,22 +27,38 @@ class DatabaseInfo:
See set_data_directories
"""
nshm: Nshm
description: str
science_base_metadata: tuple[ScienceBaseMetadata]
data_path: Path
data_dirs: list[Path] = field(default_factory=list)
database_directory: Path
nshm: Nshm
science_base_metadata: tuple[ScienceBaseMetadata]
def set_data_directories(self):
"""Set Data Directories
@property
def data_directories(self) -> set[Path]:
"""Data Directories
Walks through NSHM data directory looking for a PGA directory and
Walks through NSHM data directory looking for a PGA or PGV directory and
sets that as a data directory.
"""
data_dirs: list[Path] = []
for root, dirs, files in os.walk(self.data_path):
if Imt.PGA.name in dirs or Imt.PGV.name in dirs:
data_dirs.append(Path(root))
return set(sorted(data_dirs))
@property
def curve_files(self) -> set[Path]:
curve_files: list[Path] = []
for root, dirs, files in os.walk(self.data_path):
if Imt.PGA.name in dirs:
self.data_dirs.append(Path(root))
for f in files:
if NetcdfKeys.CURVES_FILE in f:
curve_files.append(Path(f"{root}/{f}").absolute())
return set(sorted(curve_files))
@dataclass
......@@ -58,10 +68,10 @@ class DataInfo:
Defines data info for each site class directory read in.
"""
directories: SiteClassDirectories
curve_file: Path
grid_step: float
imls: dict[Imt, tuple[float]]
imts: list[Imt]
imls: tuple[float]
imt: Imt
locations: Locations
model_region: Region
site_class: SiteClass
......@@ -69,6 +79,13 @@ class DataInfo:
year: int
@dataclass
class NetcdfInfo:
curve_file: Path
imt: Imt
site_class: SiteClass
@dataclass
class NetcdfMetadata:
"""NetCDF Metadata
......@@ -77,12 +94,12 @@ class NetcdfMetadata:
"""
database_info: DatabaseInfo
directories: tuple[SiteClassDirectories]
grid_step: float
imls: dict[Imt, tuple[float]]
imts: list[Imt]
locations: Locations
model_region: Region
netcdf_info: list[NetcdfInfo]
site_classes: tuple[SiteClass]
vs30s: tuple[int]
year: int
......@@ -60,22 +60,19 @@ class Imt(Enum):
SA10P0 = auto()
PGV = auto()
def __repr__(self):
return "<{:s}.{:s}>".format(self.__class__.__name__, self.name)
def __str__(self):
return self.display_name
@property
def period(self):
if self.name.startswith("SA"): # pylint: disable=no-member
if self.name.startswith("SA"):
return float(self.__class__._clean_sa(self.name))
else:
return None
@property
def is_sa(self):
return self.name.startswith("SA") # pylint: disable=no-member
return self.name.startswith("SA")
@property
def display_name(self):
......@@ -83,7 +80,7 @@ class Imt(Enum):
return "Peak Ground Acceleration"
elif self is self.__class__.PGV:
return "Peak Ground Velocity"
elif self.isSA:
elif self.is_sa:
return "{:} Second Spectral Acceleration".format(self.period)
else:
raise ValueError("Unimplemented display_name for Imt")
......
......@@ -6,7 +6,7 @@ Description: Define arguments for main program
import argparse
import textwrap
from ..database.database_input import _DEFAULT_DB_DIR
from ..application_inputs import _DEFAULT_DB_DIR
from ..nshm import Nshm
......@@ -43,6 +43,14 @@ parser.add_argument(
default=False,
)
parser.add_argument(
"--clean-ascii",
action="store_true",
dest="clean_ascii",
help="Remove all ASCII files after conversions, default false",
default=False,
)
parser.add_argument(
"--2018B",
action="store_true",
......
......@@ -24,9 +24,9 @@ class DataFileParser:
Parse NSHMP model data files and coordinate indexing.
"""
def __init__(self, nshm: Nshm, filepath: Path):
def __init__(self, nshm: Nshm, curve_file: Path):
self.nshm = nshm
self.filepath = filepath
self.curve_file = curve_file
data_file_info = self._get_grid_range()
self._locations = data_file_info.locations
self._header_values = data_file_info.header_values
......@@ -56,17 +56,10 @@ class DataFileParser:
"""
Return dictionaries of longitudes and latitudes at which
specified ASCII grid file is defined.
Args:
nshm: The NSHM to get the frid range for
filepath: The file location of the datafile
Returns:
The locations
"""
if not self.filepath.is_file():
raise FileNotFoundError(f"Target filepath [{self.filepath}] not found")
if not self.curve_file.is_file():
raise FileNotFoundError(f"Target filepath [{self.curve_file}] not found")
if self.nshm == Nshm.NSHM_2018A:
return self._get_grid_range_2018()
......@@ -86,7 +79,7 @@ class DataFileParser:
longitudes: list[float] = []
latitudes: list[float] = []
with open(self.filepath, "r") as csv_file:
with open(self.curve_file, "r") as csv_file:
header: str = next(csv_file)
header_values = self._parse_2018_header(header=header.strip())
for line in csv_file:
......
......@@ -2,10 +2,10 @@ import os
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from itertools import chain
from pathlib import Path
from ..database.database_info import DatabaseInfo, DataInfo, SiteClassDirectories
from ..database.database_info import DatabaseInfo, DataInfo
from ..geo.location import Locations
from ..geo.region import Region
from ..gmm.imt import Imt
......@@ -32,7 +32,10 @@ class _ScanDirInfo:
class DataPathParser:
@staticmethod
def parse_path(database_info: DatabaseInfo, data_dir: Path) -> DataInfo:
def parse_path(
database_info: DatabaseInfo,
curve_file: Path,
) -> DataInfo:
"""Parse filepath to extract parameters.
Args:
......@@ -44,44 +47,52 @@ class DataPathParser:
"""
if database_info.nshm == Nshm.NSHM_2018A:
return DataPathParser._parse_2018a_path(database_info=database_info, data_dir=data_dir)
return DataPathParser._parse_2018a_path(
database_info=database_info, curve_file=curve_file
)
else:
raise ValueError(f"NSHM [{database_info.nshm.value}] not supported")
@staticmethod
def _parse_2018a_path(database_info: DatabaseInfo, data_dir: Path) -> DataInfo:
def _parse_2018a_path(database_info: DatabaseInfo, curve_file: Path) -> DataInfo:
# 2018a filenames:
# 2018_nshm_{CLASS}_vs30_{VS}_{STEP}_degree_{BASIN}_maps.zip
# 2018_nshm_{CLASS}_vs30_{VS}_{STEP}_degree_maps.zip
# */{IMT}/curves.csv + map.csv
imt_dir = curve_file.parent
imt = Imt.from_string(imt_dir.name)
data_dir = curve_file.parent.parent
[year, _, site_class_str, _, vs30, grid_step_str, *mapOf] = data_dir.name.split("_")
if year != "2018":
raise RuntimeError(
f"Unexpected model year [{year}] from data directory [{data_dir.name}]"
)
curve_info = DataPathParser._scan_data_dir(nshm=database_info.nshm, data_dir=data_dir)
site_class = SiteClass.of_string(site_class_str)
imt_directories = dict(
sorted(curve_info.imt_directories.items(), key=lambda item: item[0].value)
curve_info = DataPathParser._parse_curve_file(
nshm=database_info.nshm, imt=imt, curve_file=curve_file
)
# curve_info = DataPathParser._scan_data_dir(nshm=database_info.nshm, data_dir=data_dir)
site_class = SiteClass.of_string(site_class_str)
# imt_directories = dict(
# sorted(curve_info.imt_directories.items(), key=lambda item: item[0].value)
# )
# Reduce locations
latitudes_list = tuple(map(lambda loc: loc.latitudes, curve_info.locations))
latitudes = tuple(sorted(set(sum(latitudes_list, ()))))
longitudes_list = tuple(map(lambda loc: loc.longitudes, curve_info.locations))
longitudes = tuple(sorted(set(sum(longitudes_list, ()))))
# latitudes_list = tuple(map(lambda loc: loc.latitudes, curve_info.locations))
# latitudes = tuple(sorted(set(sum(latitudes_list, ()))))
# longitudes_list = tuple(map(lambda loc: loc.longitudes, curve_info.locations))
# longitudes = tuple(sorted(set(sum(longitudes_list, ()))))
data_info = DataInfo(
directories=SiteClassDirectories(
site_class=site_class, directory=data_dir, imt_directories=imt_directories
),
# directories=SiteClassDirectories(
# site_class=site_class, imt_directories=imt_directories
# ),
curve_file=curve_file,
grid_step=float(grid_step_str.lower().replace("p", ".")),
imls=curve_info.imls,
imts=curve_info.imts,
locations=Locations(latitudes=latitudes, longitudes=longitudes),
model_region=DataPathParser._parse_region(data_dir.parent.name),
imt=curve_info.imt,
locations=curve_info.locations,
model_region=DataPathParser._parse_region(str(data_dir)),
site_class=site_class,
vs30=int(vs30),
year=int(year),
......@@ -89,11 +100,11 @@ class DataPathParser:
return data_info
@staticmethod
def _parse_curve_file(nshm: Nshm, imt: Imt, file_entry: os.DirEntry) -> _CurveFileInfo:
path = Path(file_entry.path).parent
print(f"\t Parsing [{path.parent.name}/{path.name}/{file_entry.name}]")
def _parse_curve_file(nshm: Nshm, imt: Imt, curve_file: Path) -> _CurveFileInfo:
path = curve_file.parent
print(f"\t Parsing [{path.parent.name}/{path.name}/{curve_file.name}]")
file_parser = DataFileParser(nshm=nshm, filepath=file_entry)
file_parser = DataFileParser(nshm=nshm, curve_file=curve_file)
imls = file_parser.header_values
locations = file_parser.locations
......@@ -111,50 +122,52 @@ class DataPathParser:
return Region.HI
elif Region.WUS.name in name:
return Region.WUS
@staticmethod
def _scan_data_dir(nshm: Nshm, data_dir: Path) -> _ScanDirInfo:
futures: list[Future[_CurveFileInfo]] = []
imls: dict[Imt, list[float]] = dict()
imts: list[Imt] = []
imt_directories: dict[Imt, Path] = {}
locations: list[Locations] = []
with ThreadPoolExecutor() as executor:
for _imt_entry in os.scandir(data_dir):
imt_entry: os.DirEntry = _imt_entry
dir_name = imt_entry.name
if (
dir_name.startswith(Imt.PGA.name)
or dir_name.startswith(Imt.PGV.name)
or dir_name.startswith("SA")
):
imt = Imt.from_string(dir_name)
imts.append(imt)
for _file_entry in os.scandir(imt_entry.path):
file_entry: os.DirEntry = _file_entry
imt_directories.setdefault(imt, Path(file_entry.path).parent)
if NetcdfKeys.CURVES_FILE in file_entry.name:
future = executor.submit(
DataPathParser._parse_curve_file,
nshm=nshm,
imt=imt,
file_entry=file_entry,
)
futures.append(future)
for _future in futures:
future: Future[_CurveFileInfo] = _future
curve_values = future.result(timeout=30)
imls.setdefault(curve_values.imt, curve_values.imls)
locations.append(curve_values.locations)
return _ScanDirInfo(
imls=imls,
imt_directories=imt_directories,
imts=imts,
locations=locations,
)
else:
raise ValueError(f"Region not found in path name ({name})")
# @staticmethod
# def _scan_data_dir(nshm: Nshm, data_dir: Path) -> _ScanDirInfo:
# futures: list[Future[_CurveFileInfo]] = []
# imls: dict[Imt, list[float]] = dict()
# imts: list[Imt] = []
# imt_directories: dict[Imt, Path] = {}
# locations: list[Locations] = []
# with ThreadPoolExecutor() as executor:
# for _imt_entry in os.scandir(data_dir):
# imt_entry: os.DirEntry = _imt_entry
# dir_name = imt_entry.name
# if (
# dir_name.startswith(Imt.PGA.name)
# or dir_name.startswith(Imt.PGV.name)
# or dir_name.startswith("SA")
# ):
# imt = Imt.from_string(dir_name)
# imts.append(imt)
# for _file_entry in os.scandir(imt_entry.path):
# file_entry: os.DirEntry = _file_entry
# imt_directories.setdefault(imt, Path(file_entry.path).parent)
# if NetcdfKeys.CURVES_FILE in file_entry.name:
# future = executor.submit(
# DataPathParser._parse_curve_file,
# nshm=nshm,
# imt=imt,
# file_entry=file_entry,
# )
# futures.append(future)
# for _future in futures:
# future: Future[_CurveFileInfo] = _future
# curve_values = future.result(timeout=30)
# imls.setdefault(curve_values.imt, curve_values.imls)
# locations.append(curve_values.locations)
# return _ScanDirInfo(
# imls=imls,
# imt_directories=imt_directories,
# imts=imts,
# locations=locations,
# )
......@@ -22,8 +22,10 @@ from .netcdf_utils import NetcdfUtils
@dataclass
class NetcdfVariableParameters:
attributes: dict = field(default_factory=lambda: NetcdfVariableAttributes().dict())
datatype: Union[str, type(str), netcdf.EnumType] = ""
dimensions: tuple[str] = field(default_factory=tuple)
datatype: Union[str, type(str), netcdf.EnumType] = field(
default_factory=Union[str, type(str), netcdf.EnumType]
)
dimensions: tuple[str] = field(default_factory=())
varname: str = ""
zlib = True
......@@ -43,7 +45,7 @@ class NetcdfParameters:
default_factory=lambda: NetcdfAttributes().BoundsAttributes().dict()
)
datatype: type(str) = str
dimensions: tuple[str] = ()
dimensions: tuple[str] = field(default_factory=lambda: (NetcdfKeys.DATA_BOUNDS,))
varname: str = NetcdfKeys.DATA_BOUNDS
@dataclass
......@@ -85,7 +87,7 @@ class NetcdfParameters:
class ImtParameters(NetcdfVariableParameters):
attributes: dict = field(default_factory=lambda: NetcdfAttributes().ImtAttributes().dict())
datatype: netcdf.EnumType = field(default_factory=netcdf.EnumType)
dimensions: tuple[str] = field(default_factory=lambda: NetcdfKeys.IMT)
dimensions: tuple[str] = field(default_factory=lambda: (NetcdfKeys.IMT,))
varname: str = NetcdfKeys.IMT
@dataclass
......@@ -94,7 +96,7 @@ class NetcdfParameters:
default_factory=lambda: NetcdfAttributes().SiteClassAttributes().dict()
)
datatype: netcdf.EnumType = field(default_factory=netcdf.EnumType)
dimensions: tuple[str] = field(default_factory=lambda: NetcdfKeys.SITE_CLASS)
dimensions: tuple[str] = field(default_factory=lambda: (NetcdfKeys.SITE_CLASS,))
varname: str = NetcdfKeys.SITE_CLASS
@dataclass
......@@ -103,5 +105,5 @@ class NetcdfParameters:
default_factory=lambda: NetcdfAttributes().Vs30Attributes().dict()
)
datatype: str = NetcdfDataType.F4.value
dimensions: tuple[str] = field(default_factory=lambda: NetcdfKeys.SITE_CLASS)
dimensions: tuple[str] = field(default_factory=lambda: (NetcdfKeys.SITE_CLASS,))
varname: str = NetcdfKeys.VS30
......@@ -112,7 +112,7 @@ class NetcdfUtils:
location_netcdf_var = group.createVariable(
varname=location.short_name,
datatype=NetcdfDataType.F4.value,
dimensions=(location.short_name,)
dimensions=(location.short_name,),
)
location_netcdf_var.setncattr(NetcdfKeys.LONG_NAME, location.long_name)
location_netcdf_var.setncattr(NetcdfKeys.STANDARD_NAME, location.long_name)
......
......@@ -2,6 +2,7 @@
"""
import os
import subprocess
import sys
from subprocess import check_call
......@@ -13,13 +14,6 @@ _python_dir = "src/main/python"
_database_path = f"{_python_path}.gov.usgs.earthquake.nshmp.netcdf"
def database():
# Download files from Science Base
args = sys.argv
args.remove(args[0])
check_call(["python", "-m", _database_path] + args)
def format_check():
# Check code format
check_call(["isort", "--check", "--diff", _python_dir])
......
......@@ -19,15 +19,21 @@ catalogs:
id: '5d31f90ce4b01d82ce86ea7b'
files:
- '0p05 Degree CONUS Site Class Boundary AB Map Data.zip'
- '0p05 Degree CONUS Site Class B Map Data.zip'
- '0p05 Degree CONUS Site Class Boundary BC Map Data.zip'
- '0p05 Degree CONUS Site Class C Map Data.zip'
- '0p05 Degree CONUS Site Class Boundary CD Map Data.zip'
- '0p05 Degree CONUS Site Class D Map Data.zip'
- '0p05 Degree CONUS Site Class Boundary DE Map Data.zip'
- '0p05 Degree CONUS Site Class E Map Data.zip'
# - '0p05 Degree CONUS Site Class B Map Data.zip'
# - '0p05 Degree CONUS Site Class Boundary BC Map Data.zip'
# - '0p05 Degree CONUS Site Class C Map Data.zip'
# - '0p05 Degree CONUS Site Class Boundary CD Map Data.zip'
# - '0p05 Degree CONUS Site Class D Map Data.zip'
# - '0p05 Degree CONUS Site Class Boundary DE Map Data.zip'
# - '0p05 Degree CONUS Site Class E Map Data.zip'
science_base_version: 'v1.1, revised 2020-02-18'
url: 'https://www.sciencebase.gov/catalog/item/5d31f90ce4b01d82ce86ea7b'
# -
# id: 'LOCAL'
# files:
# - 'src/main/resources/2018A_CONUS_PGV.zip'
# science_base_version: 'N/A'
# url: 'N/A'
# Database for 2018
-
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment