diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d2a0e989290c0fe43eb3aec75f5f0ef2bbc27631..1936de2a117e2d7ba18ecad5b4d701c2b3c0c062 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -62,29 +62,6 @@ Build Image WS: DOCKERFILE: ws.Dockerfile UPSTREAM_PATH: ghsc/nshmp/nshmp-haz -Build Lambda: - artifacts: - expire_in: 1 yr - paths: - - build/libs/nshmp-haz.jar - - build/libs/nshmp-haz-dependencies.zip - extends: - - .gradle - needs: - - Init - rules: - - - changes: - - 'src/**' - - '*gradle*' - when: on_success - - - allow_failure: true - when: manual - script: - - ./gradlew assemble - - ./gradlew libs - Build Project: extends: - .gradle @@ -132,7 +109,7 @@ Unit Tests: rules: - changes: - - 'src/**' + - 'src/**/*' - '*gradle*' when: on_success - diff --git a/build.gradle b/build.gradle index cf42f93f77e99cb7fa99a5aa61fd76f14545ec18..4e0a5e5b94dd6e2b59d8580bd890145b1a2d9a1b 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,6 @@ apply from: "${projectDir}/gradle/ext.gradle" apply from: "${projectDir}/gradle/jar.gradle" apply from: "${projectDir}/gradle/javadoc.gradle" apply from: "${projectDir}/gradle/repositories.gradle" -apply from: "${projectDir}/gradle/tasks.gradle" sourceCompatibility = JavaVersion.VERSION_11 compileJava.options.encoding = "UTF-8" diff --git a/docs/pages/Calculation-Configuration.md b/docs/pages/Calculation-Configuration.md index 36778cf8e2d738baa7c2488c13c51d7b30cf5e3c..c7a20bde54a9f7407626f35161e8b90837efbd78 100644 --- a/docs/pages/Calculation-Configuration.md +++ b/docs/pages/Calculation-Configuration.md @@ -36,7 +36,8 @@ __`site`__ `.z2p5` |`Double` | `null` | [7](#notes) __`output`__ | `.directory` |`String` | `hazout` - `.dataTypes` |`String[]` | `[ TOTAL ]` | [`DataType`][url-datatype] + `.dataTypes` |`String[]` | `[ TOTAL, MAP ]` | [`DataType`][url-datatype] + `.returnPeriods` |`Integer[]`| `[ 475, 975, 2475 ]` | [`ReturnPeriods`][url-returnperiods] __`performance`__ `.optimizeGrids` |`Boolean` | `true` | [8](#notes) `.smoothGrids` |`Boolean` | `true` | [9](#notes) @@ -49,6 +50,7 @@ __`performance`__ [url-distribution]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/DistributionFormat.html [url-site]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/Site.html [url-datatype]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/DataType.html +[url-returnperiods]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/CalcConfig.Output.html#returnPeriods [url-sheets]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/ThreadCount.html ### Notes diff --git a/gradle.properties b/gradle.properties index 05bbb8d30359538c9280b621f542f5af0487916f..b64478dc1b66d4806b6f5b6bd931a4172002c9da 100644 --- a/gradle.properties +++ b/gradle.properties @@ -8,7 +8,7 @@ junitVersion = 5.5.2 micronautVersion = 2.4.1 mnPluginVersion = 1.4.2 nodeVersion = 3.0.1 -nshmpLibVersion = 0.5.1 +nshmpLibVersion = 0.6.0 nshmpWsUtilsVersion = 0.1.2 shadowVersion = 5.2.0 spotbugsVersion = 4.2.4 diff --git a/gradle/tasks.gradle b/gradle/tasks.gradle deleted file mode 100644 index 5375745fcc94badeb800e85748a1da883fd964f8..0000000000000000000000000000000000000000 --- a/gradle/tasks.gradle +++ /dev/null @@ -1,16 +0,0 @@ - -/* - * Create a zip file of all dependencies - */ -task libs(type: Zip) { - archiveBaseName = "nshmp-haz-dependencies" - from { - configurations.compileClasspath.collect { - it - } - } - - into("java/lib") - - destinationDirectory.value(libsDirectory) -} diff --git a/src/main/java/gov/usgs/earthquake/nshmp/HazardCalc.java b/src/main/java/gov/usgs/earthquake/nshmp/HazardCalc.java index 7aa64522d7de9a4f83f9b4d7b7a86a24ceb9e8dd..b45944e03b9fc6b1d9731ba0c741df100655fe66 100644 --- a/src/main/java/gov/usgs/earthquake/nshmp/HazardCalc.java +++ b/src/main/java/gov/usgs/earthquake/nshmp/HazardCalc.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.MoreExecutors; import gov.usgs.earthquake.nshmp.calc.CalcConfig; +import gov.usgs.earthquake.nshmp.calc.DataType; import gov.usgs.earthquake.nshmp.calc.Hazard; import gov.usgs.earthquake.nshmp.calc.HazardCalcs; import gov.usgs.earthquake.nshmp.calc.HazardExport; @@ -111,6 +112,11 @@ public class HazardCalc { log.info("Sites: " + sites); Path out = calc(model, config, sites, log); + + if (config.output.dataTypes.contains(DataType.MAP)) { + HazardMaps.createDataSets(out, config.output.returnPeriods, log); + } + log.info(PROGRAM + ": finished"); /* Transfer log and write config, windows requires fh.close() */ diff --git a/src/main/java/gov/usgs/earthquake/nshmp/HazardMaps.java b/src/main/java/gov/usgs/earthquake/nshmp/HazardMaps.java new file mode 100644 index 0000000000000000000000000000000000000000..cea833653b04ba3b10883e765bfa1ab507f53e6d --- /dev/null +++ b/src/main/java/gov/usgs/earthquake/nshmp/HazardMaps.java @@ -0,0 +1,190 @@ +package gov.usgs.earthquake.nshmp; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import gov.usgs.earthquake.nshmp.data.Interpolator; +import gov.usgs.earthquake.nshmp.internal.Parsing; + +/** + * Utility class to create hazard map datasets from a hazard curve results. + * Methods in class assume *.csv curve files have no comments and have a header + * row that starts with {@code "name,lon,lat,..."} or {@code "lon,lat,..."}. + * + * @author U.S. Geological Survey + */ +public class HazardMaps { + + private static final String COMMA = ","; + private static final String CURVES_FILE = "curves.csv"; + private static final List<Integer> DEFAULT_RETURN_PERIODS = List.of(475, 975, 2475); + private static final Interpolator INTERPOLATOR = Interpolator.builder() + .logx() + .logy() + .decreasingX() + .build(); + private static final String MAP_FILE = "map.csv"; + private static final String PROGRAM = HazardMaps.class.getSimpleName(); + private static final String VALUE_FMT = "%.8e"; + private static final Function<Double, String> VALUE_FORMATTER = + Parsing.formatDoubleFunction(VALUE_FMT); + + private HazardMaps() {} + + /** + * Command line application to create a file of return period slices through a + * hazard curve dataset. Result of slicing job is saved to a {@code map.csv} + * file in the same directory as the source. + * + * @param args a path to a hazard curve result file or directory. If the + * supplied path is a directory, application will recurse through file + * tree slicing each {@code curves.csv} file encountered. + */ + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("Usage: Supply a path to a file of hazard curve results and"); + System.out.println(" optionally a space separated list of return periods (in yr)"); + System.out.println(" default return periods: 475 975 2475"); + return; + } + + Path curvesPath = Path.of(args[0]); + List<Integer> returnPeriods = DEFAULT_RETURN_PERIODS; + Logger log = Logger.getLogger(HazardMaps.class.getName()); + + if (args.length > 1) { + returnPeriods = Arrays.stream(args) + .skip(1) + .mapToInt(Integer::valueOf) + .boxed() + .collect(Collectors.toList()); + } + + try { + createDataSets(curvesPath, returnPeriods, log); + } catch (Exception e) { + System.out.println("Processing Error"); + System.out.println("Arguments: " + Arrays.toString(args)); + e.printStackTrace(); + } + } + + static void createDataSets( + Path curvesPath, + List<Integer> returnPeriods, + Logger log) throws IOException { + log.info(PROGRAM + ": Creating hazard map dataset:"); + log.info("\tReturn periods: " + returnPeriods.toString()); + log.info("\tPath: " + curvesPath.toAbsolutePath().toString()); + + if (Files.isDirectory(curvesPath)) { + CurvesVisitor curvesFinder = new CurvesVisitor(returnPeriods); + Files.walkFileTree(curvesPath, curvesFinder); + } else { + processCurveFile(curvesPath, returnPeriods); + } + } + + private static List<String> create(List<String> lines, List<Integer> returnPeriods) { + int headerCount = lines.get(0).startsWith("name") ? 3 : 2; + List<String> header = Arrays.asList(lines.get(0).split(COMMA)); + + String siteStr = header.subList(0, headerCount) + .stream() + .collect(Collectors.joining(COMMA)); + + double[] imls = header.subList(headerCount, header.size()) + .stream() + .mapToDouble(Double::valueOf) + .toArray(); + + StringBuilder mapHeader = new StringBuilder(siteStr); + returnPeriods.forEach(rp -> mapHeader.append(COMMA).append(rp)); + + List<String> linesOut = new ArrayList<>(lines.size()); + linesOut.add(mapHeader.toString()); + + Slicer slicer = new Slicer(returnPeriods, imls, headerCount); + + lines.stream() + .skip(1) + .map(slicer::slice) + .forEach(linesOut::add); + + return linesOut; + } + + private static void processCurveFile(Path curves, List<Integer> returnPeriods) { + try (Stream<String> stream = Files.lines(curves)) { + List<String> linesIn = stream.collect(Collectors.toList()); + List<String> linesOut = create(linesIn, returnPeriods); + Path maps = curves.resolveSibling(MAP_FILE); + Files.write(maps, linesOut); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + private static class CurvesVisitor extends SimpleFileVisitor<Path> { + List<Integer> returnPeriods; + + public CurvesVisitor(List<Integer> returnPeriods) { + this.returnPeriods = returnPeriods; + } + + @Override + public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) { + Path fileName = path.getFileName(); + if (fileName != null && fileName.endsWith(CURVES_FILE)) { + processCurveFile(path, returnPeriods); + } + return FileVisitResult.CONTINUE; + } + } + + private static class Slicer { + private final List<Integer> returnPeriods; + private final double[] imls; + private final int headerCount; + + private Slicer(List<Integer> returnPeriods, double imls[], int headerCount) { + this.returnPeriods = returnPeriods; + this.imls = imls; + this.headerCount = headerCount; + } + + private String slice(String line) { + List<String> elements = Arrays.asList(line.split(COMMA)); + String siteStr = elements.subList(0, headerCount) + .stream() + .collect(Collectors.joining(COMMA)); + + StringBuilder lineOut = new StringBuilder(siteStr); + + double[] rates = elements + .stream() + .skip(headerCount) + .mapToDouble(Double::valueOf) + .toArray(); + + for (double returnPeriod : returnPeriods) { + lineOut.append(COMMA); + lineOut.append(VALUE_FORMATTER.apply(INTERPOLATOR.findX(imls, rates, 1 / returnPeriod))); + } + + return lineOut.toString(); + } + } + +} diff --git a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultSliceLambda.java b/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultSliceLambda.java deleted file mode 100644 index 1311536a14383509bd5c7be42537b494a53b2833..0000000000000000000000000000000000000000 --- a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultSliceLambda.java +++ /dev/null @@ -1,310 +0,0 @@ -package gov.usgs.earthquake.nshmp.aws; - -import static com.google.common.base.Preconditions.checkState; -import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE; -import static gov.usgs.earthquake.nshmp.aws.Util.MAP_FILE; -import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestStreamHandler; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.google.common.base.Charsets; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper; -import gov.usgs.earthquake.nshmp.calc.Site; -import gov.usgs.earthquake.nshmp.data.Interpolator; -import gov.usgs.earthquake.nshmp.internal.Parsing; -import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter; -import gov.usgs.earthquake.nshmp.www.meta.Metadata; -import gov.usgs.earthquake.nshmp.www.meta.Status; -import gov.usgs.earthquake.nshmp.www.services.ServletUtil; - -/** - * AWS Lambda function to read in a curves file from AWS S3 and create slices at - * return periods interest. <br> - * - * The results are written to S3 as map.csv bucket. - */ -public class HazardResultSliceLambda implements RequestStreamHandler { - - private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient(); - - private static final String RATE_FMT = "%.8e"; - private static final Function<Double, String> FORMATTER = Parsing.formatDoubleFunction(RATE_FMT); - - private static final int NUMBER_OF_HEADERS = 3; - private static final String CONTENT_TYPE = "text/csv"; - - private static final Interpolator INTERPOLATOR = Interpolator.builder() - .logx() - .logy() - .decreasingX() - .build(); - - @Override - public void handleRequest( - InputStream input, - OutputStream output, - Context context) throws IOException { - LambdaHelper lambdaHelper = new LambdaHelper(input, output, context); - String requestBucket = ""; - - try { - RequestData request = GSON.fromJson(lambdaHelper.requestJson, RequestData.class); - lambdaHelper.logger.log("Request Data: " + GSON.toJson(request) + "\n"); - requestBucket = request.bucket + "/" + request.key; - checkRequest(request); - Response response = processRequest(request); - String json = GSON.toJson(response, Response.class); - lambdaHelper.logger.log("Result: " + json + "\n"); - output.write(json.getBytes()); - output.close(); - } catch (Exception e) { - lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n"); - String message = Metadata.errorMessage(requestBucket, e, false); - output.write(message.getBytes()); - } - } - - private static Response processRequest(RequestData request) throws IOException { - List<InterpolatedData> data = readCurveFile(request); - String outputBucket = request.bucket + "/" + request.key; - StringBuilder csv = new StringBuilder(); - createHeaderString(csv, request); - createDataString(csv, data); - writeResults(request, outputBucket, csv.toString().getBytes(Charsets.UTF_8)); - return new Response(request, outputBucket); - } - - private static List<InterpolatedData> readCurveFile(RequestData request) throws IOException { - S3Object object = S3.getObject(request.bucket, request.key + "/" + CURVES_FILE); - S3ObjectInputStream input = object.getObjectContent(); - BufferedReader reader = new BufferedReader(new InputStreamReader(input)); - List<String> lines = reader.lines().collect(Collectors.toList()); - reader.close(); - - Optional<List<String>> header = lines.stream() - .filter(line -> !line.startsWith("#")) - .findFirst() - .map(line -> Parsing.splitToList(line, Delimiter.COMMA)); - - checkState(header.isPresent(), "Curve file is empty"); - - List<String> keys = header.get().subList(0, NUMBER_OF_HEADERS); - List<Double> imls = header.get().subList(NUMBER_OF_HEADERS, header.get().size()) - .stream() - .map(iml -> Double.parseDouble(iml)) - .collect(Collectors.toList()); - - List<InterpolatedData> data = new ArrayList<>(); - lines.stream() - .filter(line -> !line.startsWith("#")) - .skip(1) - .forEach(line -> { - data.add(curveToInterpolatedData(request, line, keys, imls)); - }); - - return data; - } - - private static InterpolatedData curveToInterpolatedData( - RequestData request, - String line, - List<String> keys, - List<Double> imls) { - List<String> values = Parsing.splitToList(line, Delimiter.COMMA); - List<Double> gms = values.subList(NUMBER_OF_HEADERS, values.size()) - .stream() - .map(gm -> Double.parseDouble(gm)) - .collect(Collectors.toList()); - values = values.subList(0, NUMBER_OF_HEADERS); - - Site site = buildSite(keys, values); - List<Double> interpolatedValues = request.slices.stream() - .map(returnPeriod -> INTERPOLATOR.findX(imls, gms, returnPeriod)) - .collect(Collectors.toList()); - - return new InterpolatedData(site, interpolatedValues); - } - - private static Site buildSite(List<String> keys, List<String> values) { - Double lat = null; - Double lon = null; - String name = null; - - for (int index = 0; index < keys.size(); index++) { - String key = keys.get(index); - String value = values.get(index); - - switch (key) { - case Keys.LAT: - lat = Double.parseDouble(value); - break; - case Keys.LON: - lon = Double.parseDouble(value); - break; - case Keys.NAME: - name = value; - break; - default: - throw new IllegalStateException("Unsupported site key: " + key); - } - } - - return Site.builder() - .location(lon, lat) - .name(name) - .build(); - } - - private static void checkRequest(RequestData request) { - if (request.bucket == null) { - throw new RuntimeException("Request does not contain a S3 bucket"); - } - - if (request.key == null) { - throw new RuntimeException("Request does not contain a S3 key"); - } - - if (request.slices == null) { - throw new RuntimeException("Request does not contain returnPeriods"); - } - } - - private static void createDataString(StringBuilder builder, List<InterpolatedData> data) { - data.forEach(datum -> { - List<String> locData = Lists.newArrayList( - datum.site.name, - String.format("%.5f", datum.site.location.longitude), - String.format("%.5f", datum.site.location.latitude)); - builder.append(toLine(locData, datum.values) + "\n"); - }); - } - - private static String toLine( - Iterable<String> strings, - Iterable<Double> values) { - return Parsing.join( - Iterables.concat(strings, Iterables.transform(values, FORMATTER::apply)), - Delimiter.COMMA); - } - - private static void createHeaderString(StringBuilder builder, RequestData request) { - List<String> header = Lists.newArrayList(Keys.NAME, Keys.LON, Keys.LAT); - builder.append(toLine(header, request.slices) + "\n"); - } - - private static void writeResults( - RequestData request, - String outputBucket, - byte[] result) throws IOException { - ObjectMetadata metadata = new ObjectMetadata(); - - InputStream input = new ByteArrayInputStream(result); - metadata.setContentType(CONTENT_TYPE); - metadata.setContentLength(result.length); - PutObjectRequest putRequest = new PutObjectRequest( - request.bucket, - request.key + "/" + MAP_FILE, - input, - metadata); - S3.putObject(putRequest); - input.close(); - } - - static class RequestData { - String bucket; - String key; - List<Double> slices; - - private RequestData(Builder builder) { - bucket = builder.bucket; - key = builder.key; - slices = builder.slices; - } - - static Builder builder() { - return new Builder(); - } - - static class Builder { - private String bucket; - private String key; - private List<Double> slices; - - Builder bucket(String bucket) { - this.bucket = bucket; - return this; - } - - Builder key(String key) { - this.key = key; - return this; - } - - Builder slices(List<Double> slices) { - this.slices = slices; - return this; - } - - RequestData build() { - return new RequestData(this); - } - - } - - } - - private static class Response { - final String status; - final String date; - final RequestData request; - final String csv; - - Response(RequestData request, String outputBucket) { - status = Status.SUCCESS.toString(); - date = ZonedDateTime.now().format(ServletUtil.DATE_FMT); - this.request = request; - this.csv = outputBucket + "/" + MAP_FILE; - } - - } - - private static class InterpolatedData { - Site site; - List<Double> values; - - InterpolatedData(Site site, List<Double> values) { - this.site = site; - this.values = values; - } - } - - private static class Keys { - static final String LAT = "lat"; - static final String LON = "lon"; - static final String NAME = "name"; - } - -} diff --git a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsMetadataLambda.java b/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsMetadataLambda.java deleted file mode 100644 index 88c16e1e6ca63b3824b286f332bf6d65c2ed400e..0000000000000000000000000000000000000000 --- a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsMetadataLambda.java +++ /dev/null @@ -1,315 +0,0 @@ -package gov.usgs.earthquake.nshmp.aws; - -import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE; -import static gov.usgs.earthquake.nshmp.aws.Util.MAP_FILE; -import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestStreamHandler; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.google.common.base.Enums; -import com.google.common.base.Throwables; - -import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper; -import gov.usgs.earthquake.nshmp.calc.DataType; -import gov.usgs.earthquake.nshmp.gmm.Gmm; -import gov.usgs.earthquake.nshmp.gmm.Imt; -import gov.usgs.earthquake.nshmp.internal.Parsing; -import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter; -import gov.usgs.earthquake.nshmp.model.SourceType; -import gov.usgs.earthquake.nshmp.www.meta.Metadata; -import gov.usgs.earthquake.nshmp.www.meta.Status; -import gov.usgs.earthquake.nshmp.www.services.ServletUtil; - -/** - * AWS Lambda function to list all hazard results in the nshmp-hazout S3 bucket - * that contain a map.csv file. - */ -public class HazardResultsMetadataLambda implements RequestStreamHandler { - - private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient(); - - private static final int IMT_DIR_BACK_FROM_TOTAL = 2; - private static final int IMT_DIR_BACK_FROM_SOURCE = 4; - private static final String S3_BUCKET = "nshmp-hazout"; - private static final String RESULT_BUCKET = "nshmp-haz-lambda"; - private static final String RESULT_KEY = "nshmp-haz-aws-results-metadata.json"; - - @Override - public void handleRequest( - InputStream input, - OutputStream output, - Context context) throws IOException { - LambdaHelper lambdaHelper = new LambdaHelper(input, output, context); - - try { - Response response = processRequest(); - String json = GSON.toJson(response, Response.class); - uploadResults(json); - output.write(json.getBytes()); - output.close(); - } catch (Exception e) { - lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n"); - String message = Metadata.errorMessage("", e, false); - output.write(message.getBytes()); - } - } - - private static Response processRequest() { - Map<String, CurvesMapResult> curvesMapResults = new HashMap<>(); - Set<String> users = getUsers(); - - for (String file : new String[] { CURVES_FILE, MAP_FILE }) { - List<HazardResults> hazardResults = listObjects(users, file); - CurvesMapResult result = new CurvesMapResult(users, hazardResults); - curvesMapResults.put(file, result); - } - - Result result = new Result(curvesMapResults.get(CURVES_FILE), curvesMapResults.get(MAP_FILE)); - return new Response(result); - } - - private static List<HazardResults> listObjects(Set<String> users, String file) { - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(S3_BUCKET) - .withDelimiter(file); - ListObjectsV2Result s3Result; - List<S3Listing> s3Listings = new ArrayList<>(); - - do { - s3Result = S3.listObjectsV2(request); - s3Result.getCommonPrefixes() - .stream() - .map(key -> keyToHazardListing(key)) - .forEach(listing -> s3Listings.add(listing)); - - request.setContinuationToken(s3Result.getNextContinuationToken()); - } while (s3Result.isTruncated()); - - return transformS3Listing(users, s3Listings); - } - - private static List<HazardResults> transformS3Listing( - Set<String> users, - List<S3Listing> s3Listings) { - List<HazardResults> hazardResults = new ArrayList<>(); - - users.forEach(user -> { - TreeSet<String> resultDirectories = s3Listings.stream() - .filter(listing -> listing.user.equals(user)) - .map(listing -> listing.resultPrefix) - .collect(Collectors.toCollection(TreeSet::new)); - - resultDirectories.forEach(resultPrefix -> { - List<S3Listing> s3Filteredlistings = s3Listings.parallelStream() - .filter(listing -> listing.user.equals(user)) - .filter(listing -> listing.resultPrefix.equals(resultPrefix)) - .collect(Collectors.toList()); - - List<HazardListing> listings = s3Filteredlistings.parallelStream() - .map(listing -> s3ListingToHazardListing(listing)) - .collect(Collectors.toList()); - - S3Listing s3Listing = s3Filteredlistings.get(0); - String path = s3Listing.path.split(resultPrefix)[0]; - String s3Path = s3Listing.user + "/" + path + resultPrefix; - - hazardResults.add(new HazardResults( - user, - s3Listing.bucket, - resultPrefix, - s3Path, - listings)); - }); - }); - - return hazardResults; - } - - private static HazardListing s3ListingToHazardListing(S3Listing s3Listing) { - return new HazardListing(s3Listing.dataType, s3Listing.path, s3Listing.file); - } - - private static S3Listing keyToHazardListing(String key) { - List<String> keys = Parsing.splitToList(key, Delimiter.SLASH); - HazardDataType<?> dataType = getDataType(keys); - String user = keys.get(0); - String file = keys.get(keys.size() - 1); - String path = keys.subList(1, keys.size() - 1) - .stream() - .collect(Collectors.joining("/")); - - return new S3Listing(user, S3_BUCKET, path, file, dataType); - } - - private static Set<String> getUsers() { - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(S3_BUCKET) - .withDelimiter("/"); - - ListObjectsV2Result listing = S3.listObjectsV2(request); - - return listing.getCommonPrefixes().stream() - .map(prefix -> prefix.replace("/", "")) - .collect(Collectors.toCollection(TreeSet::new)); - } - - private static HazardDataType<?> getDataType(List<String> keys) { - String sourceType = keys.get(keys.size() - IMT_DIR_BACK_FROM_TOTAL); - HazardDataType<?> dataType = null; - String resultDirectory = null; - Imt imt = null; - - if (Enums.getIfPresent(SourceType.class, sourceType).isPresent()) { - imt = Imt.valueOf(keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE)); - resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE - 1); - SourceType type = SourceType.valueOf(sourceType); - dataType = new HazardDataType<SourceType>(imt, DataType.SOURCE, type, resultDirectory); - } else if (Enums.getIfPresent(Gmm.class, sourceType).isPresent()) { - imt = Imt.valueOf(keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE)); - resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE - 1); - Gmm type = Gmm.valueOf(sourceType); - dataType = new HazardDataType<Gmm>(imt, DataType.GMM, type, resultDirectory); - } else if (Enums.getIfPresent(Imt.class, sourceType).isPresent()) { - Imt type = Imt.valueOf(sourceType); - resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_TOTAL - 1); - imt = type; - dataType = new HazardDataType<Imt>(imt, DataType.TOTAL, type, resultDirectory); - } else { - throw new RuntimeException("Source type [" + sourceType + "] not supported"); - } - - return dataType; - } - - private static void uploadResults(String results) { - byte[] bytes = results.getBytes(); - ByteArrayInputStream input = new ByteArrayInputStream(bytes); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(bytes.length); - metadata.setContentType("application/json"); - - PutObjectRequest request = new PutObjectRequest( - RESULT_BUCKET, - RESULT_KEY, - input, - metadata); - - S3.putObject(request); - } - - static class HazardDataType<E extends Enum<E>> { - final Imt imt; - final DataType type; - final transient String resultPrefix; - final E sourceType; - - HazardDataType(Imt imt, DataType type, E sourceType, String resultPrefix) { - this.imt = imt; - this.type = type; - this.resultPrefix = resultPrefix; - this.sourceType = sourceType; - } - } - - private static class HazardResults { - final String user; - final String bucket; - final String resultPrefix; - final String path; - final List<HazardListing> listings; - - HazardResults( - String user, - String bucket, - String resultPrefix, - String path, - List<HazardListing> listings) { - this.user = user; - this.bucket = bucket; - this.resultPrefix = resultPrefix; - this.path = path; - this.listings = listings; - } - } - - private static class HazardListing { - final HazardDataType<?> dataType; - final String file; - final String path; - - HazardListing(HazardDataType<?> dataType, String path, String file) { - this.dataType = dataType; - this.file = file; - this.path = path; - } - } - - private static class S3Listing { - final String user; - final String bucket; - final String path; - final String file; - final String resultPrefix; - final HazardDataType<?> dataType; - - S3Listing(String user, String bucket, String path, String file, HazardDataType<?> dataType) { - this.user = user; - this.bucket = bucket; - this.path = path; - this.file = file; - this.resultPrefix = dataType.resultPrefix; - this.dataType = dataType; - } - } - - private static class CurvesMapResult { - final Set<String> users; - final List<HazardResults> hazardResults; - - CurvesMapResult(Set<String> users, List<HazardResults> hazardResults) { - this.users = users; - this.hazardResults = hazardResults; - } - } - - private static class Result { - final CurvesMapResult curves; - final CurvesMapResult map; - - Result(CurvesMapResult curves, CurvesMapResult map) { - this.curves = curves; - this.map = map; - } - } - - private static class Response { - final String status; - final String date; - final Result result; - - Response(Result result) { - status = Status.SUCCESS.toString(); - date = ZonedDateTime.now().format(ServletUtil.DATE_FMT); - this.result = result; - } - } -} diff --git a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsSlicerLambda.java b/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsSlicerLambda.java deleted file mode 100644 index a535f4cd8edd86b673d47bae589e06cf151c2e90..0000000000000000000000000000000000000000 --- a/src/main/java/gov/usgs/earthquake/nshmp/aws/HazardResultsSlicerLambda.java +++ /dev/null @@ -1,214 +0,0 @@ -package gov.usgs.earthquake.nshmp.aws; - -import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE; -import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.AmazonEC2ClientBuilder; -import com.amazonaws.services.lambda.AWSLambda; -import com.amazonaws.services.lambda.AWSLambdaClientBuilder; -import com.amazonaws.services.lambda.model.InvokeRequest; -import com.amazonaws.services.lambda.model.InvokeResult; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestStreamHandler; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.ObjectListing; -import com.google.common.base.Throwables; - -import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper; -import gov.usgs.earthquake.nshmp.internal.Parsing; -import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter; -import gov.usgs.earthquake.nshmp.www.meta.Metadata; -import gov.usgs.earthquake.nshmp.www.meta.Status; -import gov.usgs.earthquake.nshmp.www.services.ServletUtil; - -/** - * AWS Lambda function to read in hazard results from S3 and to create slices of - * return periods of interest. - * - * @see HazardResultSliceLambda - */ -public class HazardResultsSlicerLambda implements RequestStreamHandler { - - private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient(); - private static final AmazonEC2 EC2 = AmazonEC2ClientBuilder.defaultClient(); - private static final AWSLambda LAMBDA_CLIENT = AWSLambdaClientBuilder.defaultClient(); - - private static final String CURVE_SLICE_LAMBDA = System.getenv("CURVE_SLICE_LAMBDA_NAME"); - private static final String INSTANCE_STATUS = "terminated"; - - private static final int MAX_INSTANCE_CHECK = 100; - private static final int INSTANCE_CHECK_TIMEOUT = 10 * 1000; - - @Override - public void handleRequest( - InputStream input, - OutputStream output, - Context context) throws IOException { - LambdaHelper lambdaHelper = new LambdaHelper(input, output, context); - String requestBucket = ""; - - try { - RequestData request = GSON.fromJson(lambdaHelper.requestJson, RequestData.class); - requestBucket = String.format("%s/%s", request.bucket, request.key); - lambdaHelper.logger.log("Request Data: " + GSON.toJson(request) + "\n\n"); - checkRequest(request); - checkBucket(request); - Response response = processRequest(lambdaHelper, request); - output.write(GSON.toJson(response, Response.class).getBytes()); - } catch (Exception e) { - lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n"); - String message = Metadata.errorMessage(requestBucket, e, false); - output.write(message.getBytes()); - } - } - - private static Response processRequest( - LambdaHelper lambdaHelper, - RequestData request) throws IOException, InterruptedException { - ObjectListing objectListing = S3.listObjects(request.bucket, request.key); - List<CompletableFuture<Void>> futures = new ArrayList<>(); - - objectListing.getObjectSummaries() - .parallelStream() - .filter(summary -> summary.getKey().endsWith(CURVES_FILE)) - .forEach(summary -> { - String name = summary.getKey(); - lambdaHelper.logger.log("Reading: " + name + "\n"); - try { - futures.add(processCurveFile(request, lambdaHelper, name)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - - futures.forEach(CompletableFuture::join); - lambdaHelper.logger.log("Zipping results"); - return new Response(request); - } - - private static CompletableFuture<Void> processCurveFile( - RequestData request, - LambdaHelper lambdaHelper, - String curvesPath) throws IOException { - return readCurveFile(request, curvesPath) - .thenAcceptAsync(result -> { - checkLambdaResponse(result); - }); - } - - private static CompletableFuture<InvokeResult> readCurveFile( - RequestData request, - String curvesPath) throws IOException { - List<String> names = Arrays.stream(curvesPath.split("/")) - .collect(Collectors.toList()); - names.remove(names.size() - 1); - String key = Parsing.join(names, Delimiter.SLASH); - - HazardResultSliceLambda.RequestData lambdaRequest = HazardResultSliceLambda.RequestData - .builder() - .bucket(request.bucket) - .key(key) - .slices(request.slices) - .build(); - - InvokeRequest invokeRequest = new InvokeRequest() - .withFunctionName(CURVE_SLICE_LAMBDA) - .withPayload(GSON.toJson(lambdaRequest)); - - return CompletableFuture.supplyAsync(() -> { - return LAMBDA_CLIENT.invoke(invokeRequest); - }); - } - - private static void checkRequest(RequestData request) { - if (request.bucket == null) { - throw new RuntimeException("Request does not contain a S3 bucket"); - } - - if (request.key == null) { - throw new RuntimeException("Request does not contain a S3 key"); - } - - if (request.slices == null) { - throw new RuntimeException("Request does not contain slices"); - } - } - - private static void checkBucket(RequestData request) { - if (!S3.doesBucketExistV2(request.bucket)) { - throw new RuntimeException(String.format("S3 bucket [%s] does not exist", request.bucket)); - } - } - - private static void checkLambdaResponse(InvokeResult result) { - try { - LambdaResponse response = GSON.fromJson( - new String(result.getPayload().array()), - LambdaResponse.class); - - if (Status.ERROR.toString().equals(response.status)) { - throw new RuntimeException(response.message); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static class LambdaResponse { - String status; - String message; - } - - private static class ZipResultsResponse extends LambdaResponse { - ZipResult result; - ZipRequest request; - - private static class ZipRequest { - String bucket; - String key; - } - - private static class ZipResult { - String path; - String instanceId; - - ZipResult(String path, String instanceId) { - this.path = path; - this.instanceId = instanceId; - } - } - } - - private static class RequestData { - String bucket; - String key; - List<Double> slices; - } - - private static class Response { - final String status; - final String date; - final RequestData request; - final String outputBucket; - - Response(RequestData request) { - status = Status.SUCCESS.toString(); - date = ZonedDateTime.now().format(ServletUtil.DATE_FMT); - this.request = request; - this.outputBucket = String.format("%s/%s", request.bucket, request.key); - } - } - -} diff --git a/src/main/java/gov/usgs/earthquake/nshmp/aws/Util.java b/src/main/java/gov/usgs/earthquake/nshmp/aws/Util.java deleted file mode 100644 index 13db3f37c8f52d9613d85928096e654ba3698da8..0000000000000000000000000000000000000000 --- a/src/main/java/gov/usgs/earthquake/nshmp/aws/Util.java +++ /dev/null @@ -1,41 +0,0 @@ -package gov.usgs.earthquake.nshmp.aws; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - -public class Util { - - static final String CURVES_FILE = "curves.csv"; - static final String MAP_FILE = "map.csv"; - - /** - * Parse the Lambda function {@code InputStream} into an {@code JsonObject}. - */ - static class LambdaHelper { - JsonObject requestJson; - Context context; - LambdaLogger logger; - OutputStream output; - - LambdaHelper(InputStream input, OutputStream output, Context context) - throws UnsupportedEncodingException { - logger = context.getLogger(); - this.context = context; - this.output = output; - - BufferedReader reader = new BufferedReader(new InputStreamReader(input)); - JsonParser parser = new JsonParser(); - - requestJson = parser.parse(reader).getAsJsonObject(); - } - } - -}