Skip to content
Snippets Groups Projects
Commit 9b7cf673 authored by Powers, Peter M.'s avatar Powers, Peter M.
Browse files

Merge branch 'issue-460' into 'main'

Resolves - Isolate hazard slicer utility

Closes #460

See merge request !539
parents aae03699 e46b52d3
No related branches found
No related tags found
2 merge requests!541Production Release | nshmp-haz | nshmp-haz 2.0 ll Sprint,!539Resolves - Isolate hazard slicer utility
Pipeline #63757 passed
......@@ -62,29 +62,6 @@ Build Image WS:
DOCKERFILE: ws.Dockerfile
UPSTREAM_PATH: ghsc/nshmp/nshmp-haz
Build Lambda:
artifacts:
expire_in: 1 yr
paths:
- build/libs/nshmp-haz.jar
- build/libs/nshmp-haz-dependencies.zip
extends:
- .gradle
needs:
- Init
rules:
-
changes:
- 'src/**'
- '*gradle*'
when: on_success
-
allow_failure: true
when: manual
script:
- ./gradlew assemble
- ./gradlew libs
Build Project:
extends:
- .gradle
......@@ -132,7 +109,7 @@ Unit Tests:
rules:
-
changes:
- 'src/**'
- 'src/**/*'
- '*gradle*'
when: on_success
-
......
......@@ -49,7 +49,6 @@ apply from: "${projectDir}/gradle/ext.gradle"
apply from: "${projectDir}/gradle/jar.gradle"
apply from: "${projectDir}/gradle/javadoc.gradle"
apply from: "${projectDir}/gradle/repositories.gradle"
apply from: "${projectDir}/gradle/tasks.gradle"
sourceCompatibility = JavaVersion.VERSION_11
compileJava.options.encoding = "UTF-8"
......
......@@ -36,7 +36,8 @@ __`site`__
   `.z2p5` |`Double` | `null` | [7](#notes)
__`output`__ |
   `.directory` |`String` | `hazout`
   `.dataTypes` |`String[]` | `[ TOTAL ]` | [`DataType`][url-datatype]
   `.dataTypes` |`String[]` | `[ TOTAL, MAP ]` | [`DataType`][url-datatype]
   `.returnPeriods` |`Integer[]`| `[ 475, 975, 2475 ]` | [`ReturnPeriods`][url-returnperiods]
__`performance`__
   `.optimizeGrids` |`Boolean` | `true` | [8](#notes)
   `.smoothGrids` |`Boolean` | `true` | [9](#notes)
......@@ -49,6 +50,7 @@ __`performance`__
[url-distribution]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/DistributionFormat.html
[url-site]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/Site.html
[url-datatype]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/DataType.html
[url-returnperiods]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/CalcConfig.Output.html#returnPeriods
[url-sheets]: https://earthquake.usgs.gov/nshmp/docs/nshmp-lib/gov/usgs/earthquake/nshmp/calc/ThreadCount.html
### Notes
......
......@@ -8,7 +8,7 @@ junitVersion = 5.5.2
micronautVersion = 2.4.1
mnPluginVersion = 1.4.2
nodeVersion = 3.0.1
nshmpLibVersion = 0.5.1
nshmpLibVersion = 0.6.0
nshmpWsUtilsVersion = 0.1.2
shadowVersion = 5.2.0
spotbugsVersion = 4.2.4
......
/*
* Create a zip file of all dependencies
*/
task libs(type: Zip) {
archiveBaseName = "nshmp-haz-dependencies"
from {
configurations.compileClasspath.collect {
it
}
}
into("java/lib")
destinationDirectory.value(libsDirectory)
}
......@@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import gov.usgs.earthquake.nshmp.calc.CalcConfig;
import gov.usgs.earthquake.nshmp.calc.DataType;
import gov.usgs.earthquake.nshmp.calc.Hazard;
import gov.usgs.earthquake.nshmp.calc.HazardCalcs;
import gov.usgs.earthquake.nshmp.calc.HazardExport;
......@@ -111,6 +112,11 @@ public class HazardCalc {
log.info("Sites: " + sites);
Path out = calc(model, config, sites, log);
if (config.output.dataTypes.contains(DataType.MAP)) {
HazardMaps.createDataSets(out, config.output.returnPeriods, log);
}
log.info(PROGRAM + ": finished");
/* Transfer log and write config, windows requires fh.close() */
......
package gov.usgs.earthquake.nshmp;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import gov.usgs.earthquake.nshmp.data.Interpolator;
import gov.usgs.earthquake.nshmp.internal.Parsing;
/**
* Utility class to create hazard map datasets from a hazard curve results.
* Methods in class assume *.csv curve files have no comments and have a header
* row that starts with {@code "name,lon,lat,..."} or {@code "lon,lat,..."}.
*
* @author U.S. Geological Survey
*/
public class HazardMaps {
private static final String COMMA = ",";
private static final String CURVES_FILE = "curves.csv";
private static final List<Integer> DEFAULT_RETURN_PERIODS = List.of(475, 975, 2475);
private static final Interpolator INTERPOLATOR = Interpolator.builder()
.logx()
.logy()
.decreasingX()
.build();
private static final String MAP_FILE = "map.csv";
private static final String PROGRAM = HazardMaps.class.getSimpleName();
private static final String VALUE_FMT = "%.8e";
private static final Function<Double, String> VALUE_FORMATTER =
Parsing.formatDoubleFunction(VALUE_FMT);
private HazardMaps() {}
/**
* Command line application to create a file of return period slices through a
* hazard curve dataset. Result of slicing job is saved to a {@code map.csv}
* file in the same directory as the source.
*
* @param args a path to a hazard curve result file or directory. If the
* supplied path is a directory, application will recurse through file
* tree slicing each {@code curves.csv} file encountered.
*/
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Usage: Supply a path to a file of hazard curve results and");
System.out.println(" optionally a space separated list of return periods (in yr)");
System.out.println(" default return periods: 475 975 2475");
return;
}
Path curvesPath = Path.of(args[0]);
List<Integer> returnPeriods = DEFAULT_RETURN_PERIODS;
Logger log = Logger.getLogger(HazardMaps.class.getName());
if (args.length > 1) {
returnPeriods = Arrays.stream(args)
.skip(1)
.mapToInt(Integer::valueOf)
.boxed()
.collect(Collectors.toList());
}
try {
createDataSets(curvesPath, returnPeriods, log);
} catch (Exception e) {
System.out.println("Processing Error");
System.out.println("Arguments: " + Arrays.toString(args));
e.printStackTrace();
}
}
static void createDataSets(
Path curvesPath,
List<Integer> returnPeriods,
Logger log) throws IOException {
log.info(PROGRAM + ": Creating hazard map dataset:");
log.info("\tReturn periods: " + returnPeriods.toString());
log.info("\tPath: " + curvesPath.toAbsolutePath().toString());
if (Files.isDirectory(curvesPath)) {
CurvesVisitor curvesFinder = new CurvesVisitor(returnPeriods);
Files.walkFileTree(curvesPath, curvesFinder);
} else {
processCurveFile(curvesPath, returnPeriods);
}
}
private static List<String> create(List<String> lines, List<Integer> returnPeriods) {
int headerCount = lines.get(0).startsWith("name") ? 3 : 2;
List<String> header = Arrays.asList(lines.get(0).split(COMMA));
String siteStr = header.subList(0, headerCount)
.stream()
.collect(Collectors.joining(COMMA));
double[] imls = header.subList(headerCount, header.size())
.stream()
.mapToDouble(Double::valueOf)
.toArray();
StringBuilder mapHeader = new StringBuilder(siteStr);
returnPeriods.forEach(rp -> mapHeader.append(COMMA).append(rp));
List<String> linesOut = new ArrayList<>(lines.size());
linesOut.add(mapHeader.toString());
Slicer slicer = new Slicer(returnPeriods, imls, headerCount);
lines.stream()
.skip(1)
.map(slicer::slice)
.forEach(linesOut::add);
return linesOut;
}
private static void processCurveFile(Path curves, List<Integer> returnPeriods) {
try (Stream<String> stream = Files.lines(curves)) {
List<String> linesIn = stream.collect(Collectors.toList());
List<String> linesOut = create(linesIn, returnPeriods);
Path maps = curves.resolveSibling(MAP_FILE);
Files.write(maps, linesOut);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
private static class CurvesVisitor extends SimpleFileVisitor<Path> {
List<Integer> returnPeriods;
public CurvesVisitor(List<Integer> returnPeriods) {
this.returnPeriods = returnPeriods;
}
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
Path fileName = path.getFileName();
if (fileName != null && fileName.endsWith(CURVES_FILE)) {
processCurveFile(path, returnPeriods);
}
return FileVisitResult.CONTINUE;
}
}
private static class Slicer {
private final List<Integer> returnPeriods;
private final double[] imls;
private final int headerCount;
private Slicer(List<Integer> returnPeriods, double imls[], int headerCount) {
this.returnPeriods = returnPeriods;
this.imls = imls;
this.headerCount = headerCount;
}
private String slice(String line) {
List<String> elements = Arrays.asList(line.split(COMMA));
String siteStr = elements.subList(0, headerCount)
.stream()
.collect(Collectors.joining(COMMA));
StringBuilder lineOut = new StringBuilder(siteStr);
double[] rates = elements
.stream()
.skip(headerCount)
.mapToDouble(Double::valueOf)
.toArray();
for (double returnPeriod : returnPeriods) {
lineOut.append(COMMA);
lineOut.append(VALUE_FORMATTER.apply(INTERPOLATOR.findX(imls, rates, 1 / returnPeriod)));
}
return lineOut.toString();
}
}
}
package gov.usgs.earthquake.nshmp.aws;
import static com.google.common.base.Preconditions.checkState;
import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE;
import static gov.usgs.earthquake.nshmp.aws.Util.MAP_FILE;
import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper;
import gov.usgs.earthquake.nshmp.calc.Site;
import gov.usgs.earthquake.nshmp.data.Interpolator;
import gov.usgs.earthquake.nshmp.internal.Parsing;
import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter;
import gov.usgs.earthquake.nshmp.www.meta.Metadata;
import gov.usgs.earthquake.nshmp.www.meta.Status;
import gov.usgs.earthquake.nshmp.www.services.ServletUtil;
/**
* AWS Lambda function to read in a curves file from AWS S3 and create slices at
* return periods interest. <br>
*
* The results are written to S3 as map.csv bucket.
*/
public class HazardResultSliceLambda implements RequestStreamHandler {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
private static final String RATE_FMT = "%.8e";
private static final Function<Double, String> FORMATTER = Parsing.formatDoubleFunction(RATE_FMT);
private static final int NUMBER_OF_HEADERS = 3;
private static final String CONTENT_TYPE = "text/csv";
private static final Interpolator INTERPOLATOR = Interpolator.builder()
.logx()
.logy()
.decreasingX()
.build();
@Override
public void handleRequest(
InputStream input,
OutputStream output,
Context context) throws IOException {
LambdaHelper lambdaHelper = new LambdaHelper(input, output, context);
String requestBucket = "";
try {
RequestData request = GSON.fromJson(lambdaHelper.requestJson, RequestData.class);
lambdaHelper.logger.log("Request Data: " + GSON.toJson(request) + "\n");
requestBucket = request.bucket + "/" + request.key;
checkRequest(request);
Response response = processRequest(request);
String json = GSON.toJson(response, Response.class);
lambdaHelper.logger.log("Result: " + json + "\n");
output.write(json.getBytes());
output.close();
} catch (Exception e) {
lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n");
String message = Metadata.errorMessage(requestBucket, e, false);
output.write(message.getBytes());
}
}
private static Response processRequest(RequestData request) throws IOException {
List<InterpolatedData> data = readCurveFile(request);
String outputBucket = request.bucket + "/" + request.key;
StringBuilder csv = new StringBuilder();
createHeaderString(csv, request);
createDataString(csv, data);
writeResults(request, outputBucket, csv.toString().getBytes(Charsets.UTF_8));
return new Response(request, outputBucket);
}
private static List<InterpolatedData> readCurveFile(RequestData request) throws IOException {
S3Object object = S3.getObject(request.bucket, request.key + "/" + CURVES_FILE);
S3ObjectInputStream input = object.getObjectContent();
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
List<String> lines = reader.lines().collect(Collectors.toList());
reader.close();
Optional<List<String>> header = lines.stream()
.filter(line -> !line.startsWith("#"))
.findFirst()
.map(line -> Parsing.splitToList(line, Delimiter.COMMA));
checkState(header.isPresent(), "Curve file is empty");
List<String> keys = header.get().subList(0, NUMBER_OF_HEADERS);
List<Double> imls = header.get().subList(NUMBER_OF_HEADERS, header.get().size())
.stream()
.map(iml -> Double.parseDouble(iml))
.collect(Collectors.toList());
List<InterpolatedData> data = new ArrayList<>();
lines.stream()
.filter(line -> !line.startsWith("#"))
.skip(1)
.forEach(line -> {
data.add(curveToInterpolatedData(request, line, keys, imls));
});
return data;
}
private static InterpolatedData curveToInterpolatedData(
RequestData request,
String line,
List<String> keys,
List<Double> imls) {
List<String> values = Parsing.splitToList(line, Delimiter.COMMA);
List<Double> gms = values.subList(NUMBER_OF_HEADERS, values.size())
.stream()
.map(gm -> Double.parseDouble(gm))
.collect(Collectors.toList());
values = values.subList(0, NUMBER_OF_HEADERS);
Site site = buildSite(keys, values);
List<Double> interpolatedValues = request.slices.stream()
.map(returnPeriod -> INTERPOLATOR.findX(imls, gms, returnPeriod))
.collect(Collectors.toList());
return new InterpolatedData(site, interpolatedValues);
}
private static Site buildSite(List<String> keys, List<String> values) {
Double lat = null;
Double lon = null;
String name = null;
for (int index = 0; index < keys.size(); index++) {
String key = keys.get(index);
String value = values.get(index);
switch (key) {
case Keys.LAT:
lat = Double.parseDouble(value);
break;
case Keys.LON:
lon = Double.parseDouble(value);
break;
case Keys.NAME:
name = value;
break;
default:
throw new IllegalStateException("Unsupported site key: " + key);
}
}
return Site.builder()
.location(lon, lat)
.name(name)
.build();
}
private static void checkRequest(RequestData request) {
if (request.bucket == null) {
throw new RuntimeException("Request does not contain a S3 bucket");
}
if (request.key == null) {
throw new RuntimeException("Request does not contain a S3 key");
}
if (request.slices == null) {
throw new RuntimeException("Request does not contain returnPeriods");
}
}
private static void createDataString(StringBuilder builder, List<InterpolatedData> data) {
data.forEach(datum -> {
List<String> locData = Lists.newArrayList(
datum.site.name,
String.format("%.5f", datum.site.location.longitude),
String.format("%.5f", datum.site.location.latitude));
builder.append(toLine(locData, datum.values) + "\n");
});
}
private static String toLine(
Iterable<String> strings,
Iterable<Double> values) {
return Parsing.join(
Iterables.concat(strings, Iterables.transform(values, FORMATTER::apply)),
Delimiter.COMMA);
}
private static void createHeaderString(StringBuilder builder, RequestData request) {
List<String> header = Lists.newArrayList(Keys.NAME, Keys.LON, Keys.LAT);
builder.append(toLine(header, request.slices) + "\n");
}
private static void writeResults(
RequestData request,
String outputBucket,
byte[] result) throws IOException {
ObjectMetadata metadata = new ObjectMetadata();
InputStream input = new ByteArrayInputStream(result);
metadata.setContentType(CONTENT_TYPE);
metadata.setContentLength(result.length);
PutObjectRequest putRequest = new PutObjectRequest(
request.bucket,
request.key + "/" + MAP_FILE,
input,
metadata);
S3.putObject(putRequest);
input.close();
}
static class RequestData {
String bucket;
String key;
List<Double> slices;
private RequestData(Builder builder) {
bucket = builder.bucket;
key = builder.key;
slices = builder.slices;
}
static Builder builder() {
return new Builder();
}
static class Builder {
private String bucket;
private String key;
private List<Double> slices;
Builder bucket(String bucket) {
this.bucket = bucket;
return this;
}
Builder key(String key) {
this.key = key;
return this;
}
Builder slices(List<Double> slices) {
this.slices = slices;
return this;
}
RequestData build() {
return new RequestData(this);
}
}
}
private static class Response {
final String status;
final String date;
final RequestData request;
final String csv;
Response(RequestData request, String outputBucket) {
status = Status.SUCCESS.toString();
date = ZonedDateTime.now().format(ServletUtil.DATE_FMT);
this.request = request;
this.csv = outputBucket + "/" + MAP_FILE;
}
}
private static class InterpolatedData {
Site site;
List<Double> values;
InterpolatedData(Site site, List<Double> values) {
this.site = site;
this.values = values;
}
}
private static class Keys {
static final String LAT = "lat";
static final String LON = "lon";
static final String NAME = "name";
}
}
package gov.usgs.earthquake.nshmp.aws;
import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE;
import static gov.usgs.earthquake.nshmp.aws.Util.MAP_FILE;
import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.google.common.base.Enums;
import com.google.common.base.Throwables;
import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper;
import gov.usgs.earthquake.nshmp.calc.DataType;
import gov.usgs.earthquake.nshmp.gmm.Gmm;
import gov.usgs.earthquake.nshmp.gmm.Imt;
import gov.usgs.earthquake.nshmp.internal.Parsing;
import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter;
import gov.usgs.earthquake.nshmp.model.SourceType;
import gov.usgs.earthquake.nshmp.www.meta.Metadata;
import gov.usgs.earthquake.nshmp.www.meta.Status;
import gov.usgs.earthquake.nshmp.www.services.ServletUtil;
/**
* AWS Lambda function to list all hazard results in the nshmp-hazout S3 bucket
* that contain a map.csv file.
*/
public class HazardResultsMetadataLambda implements RequestStreamHandler {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
private static final int IMT_DIR_BACK_FROM_TOTAL = 2;
private static final int IMT_DIR_BACK_FROM_SOURCE = 4;
private static final String S3_BUCKET = "nshmp-hazout";
private static final String RESULT_BUCKET = "nshmp-haz-lambda";
private static final String RESULT_KEY = "nshmp-haz-aws-results-metadata.json";
@Override
public void handleRequest(
InputStream input,
OutputStream output,
Context context) throws IOException {
LambdaHelper lambdaHelper = new LambdaHelper(input, output, context);
try {
Response response = processRequest();
String json = GSON.toJson(response, Response.class);
uploadResults(json);
output.write(json.getBytes());
output.close();
} catch (Exception e) {
lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n");
String message = Metadata.errorMessage("", e, false);
output.write(message.getBytes());
}
}
private static Response processRequest() {
Map<String, CurvesMapResult> curvesMapResults = new HashMap<>();
Set<String> users = getUsers();
for (String file : new String[] { CURVES_FILE, MAP_FILE }) {
List<HazardResults> hazardResults = listObjects(users, file);
CurvesMapResult result = new CurvesMapResult(users, hazardResults);
curvesMapResults.put(file, result);
}
Result result = new Result(curvesMapResults.get(CURVES_FILE), curvesMapResults.get(MAP_FILE));
return new Response(result);
}
private static List<HazardResults> listObjects(Set<String> users, String file) {
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(S3_BUCKET)
.withDelimiter(file);
ListObjectsV2Result s3Result;
List<S3Listing> s3Listings = new ArrayList<>();
do {
s3Result = S3.listObjectsV2(request);
s3Result.getCommonPrefixes()
.stream()
.map(key -> keyToHazardListing(key))
.forEach(listing -> s3Listings.add(listing));
request.setContinuationToken(s3Result.getNextContinuationToken());
} while (s3Result.isTruncated());
return transformS3Listing(users, s3Listings);
}
private static List<HazardResults> transformS3Listing(
Set<String> users,
List<S3Listing> s3Listings) {
List<HazardResults> hazardResults = new ArrayList<>();
users.forEach(user -> {
TreeSet<String> resultDirectories = s3Listings.stream()
.filter(listing -> listing.user.equals(user))
.map(listing -> listing.resultPrefix)
.collect(Collectors.toCollection(TreeSet::new));
resultDirectories.forEach(resultPrefix -> {
List<S3Listing> s3Filteredlistings = s3Listings.parallelStream()
.filter(listing -> listing.user.equals(user))
.filter(listing -> listing.resultPrefix.equals(resultPrefix))
.collect(Collectors.toList());
List<HazardListing> listings = s3Filteredlistings.parallelStream()
.map(listing -> s3ListingToHazardListing(listing))
.collect(Collectors.toList());
S3Listing s3Listing = s3Filteredlistings.get(0);
String path = s3Listing.path.split(resultPrefix)[0];
String s3Path = s3Listing.user + "/" + path + resultPrefix;
hazardResults.add(new HazardResults(
user,
s3Listing.bucket,
resultPrefix,
s3Path,
listings));
});
});
return hazardResults;
}
private static HazardListing s3ListingToHazardListing(S3Listing s3Listing) {
return new HazardListing(s3Listing.dataType, s3Listing.path, s3Listing.file);
}
private static S3Listing keyToHazardListing(String key) {
List<String> keys = Parsing.splitToList(key, Delimiter.SLASH);
HazardDataType<?> dataType = getDataType(keys);
String user = keys.get(0);
String file = keys.get(keys.size() - 1);
String path = keys.subList(1, keys.size() - 1)
.stream()
.collect(Collectors.joining("/"));
return new S3Listing(user, S3_BUCKET, path, file, dataType);
}
private static Set<String> getUsers() {
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(S3_BUCKET)
.withDelimiter("/");
ListObjectsV2Result listing = S3.listObjectsV2(request);
return listing.getCommonPrefixes().stream()
.map(prefix -> prefix.replace("/", ""))
.collect(Collectors.toCollection(TreeSet::new));
}
private static HazardDataType<?> getDataType(List<String> keys) {
String sourceType = keys.get(keys.size() - IMT_DIR_BACK_FROM_TOTAL);
HazardDataType<?> dataType = null;
String resultDirectory = null;
Imt imt = null;
if (Enums.getIfPresent(SourceType.class, sourceType).isPresent()) {
imt = Imt.valueOf(keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE));
resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE - 1);
SourceType type = SourceType.valueOf(sourceType);
dataType = new HazardDataType<SourceType>(imt, DataType.SOURCE, type, resultDirectory);
} else if (Enums.getIfPresent(Gmm.class, sourceType).isPresent()) {
imt = Imt.valueOf(keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE));
resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_SOURCE - 1);
Gmm type = Gmm.valueOf(sourceType);
dataType = new HazardDataType<Gmm>(imt, DataType.GMM, type, resultDirectory);
} else if (Enums.getIfPresent(Imt.class, sourceType).isPresent()) {
Imt type = Imt.valueOf(sourceType);
resultDirectory = keys.get(keys.size() - IMT_DIR_BACK_FROM_TOTAL - 1);
imt = type;
dataType = new HazardDataType<Imt>(imt, DataType.TOTAL, type, resultDirectory);
} else {
throw new RuntimeException("Source type [" + sourceType + "] not supported");
}
return dataType;
}
private static void uploadResults(String results) {
byte[] bytes = results.getBytes();
ByteArrayInputStream input = new ByteArrayInputStream(bytes);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(bytes.length);
metadata.setContentType("application/json");
PutObjectRequest request = new PutObjectRequest(
RESULT_BUCKET,
RESULT_KEY,
input,
metadata);
S3.putObject(request);
}
static class HazardDataType<E extends Enum<E>> {
final Imt imt;
final DataType type;
final transient String resultPrefix;
final E sourceType;
HazardDataType(Imt imt, DataType type, E sourceType, String resultPrefix) {
this.imt = imt;
this.type = type;
this.resultPrefix = resultPrefix;
this.sourceType = sourceType;
}
}
private static class HazardResults {
final String user;
final String bucket;
final String resultPrefix;
final String path;
final List<HazardListing> listings;
HazardResults(
String user,
String bucket,
String resultPrefix,
String path,
List<HazardListing> listings) {
this.user = user;
this.bucket = bucket;
this.resultPrefix = resultPrefix;
this.path = path;
this.listings = listings;
}
}
private static class HazardListing {
final HazardDataType<?> dataType;
final String file;
final String path;
HazardListing(HazardDataType<?> dataType, String path, String file) {
this.dataType = dataType;
this.file = file;
this.path = path;
}
}
private static class S3Listing {
final String user;
final String bucket;
final String path;
final String file;
final String resultPrefix;
final HazardDataType<?> dataType;
S3Listing(String user, String bucket, String path, String file, HazardDataType<?> dataType) {
this.user = user;
this.bucket = bucket;
this.path = path;
this.file = file;
this.resultPrefix = dataType.resultPrefix;
this.dataType = dataType;
}
}
private static class CurvesMapResult {
final Set<String> users;
final List<HazardResults> hazardResults;
CurvesMapResult(Set<String> users, List<HazardResults> hazardResults) {
this.users = users;
this.hazardResults = hazardResults;
}
}
private static class Result {
final CurvesMapResult curves;
final CurvesMapResult map;
Result(CurvesMapResult curves, CurvesMapResult map) {
this.curves = curves;
this.map = map;
}
}
private static class Response {
final String status;
final String date;
final Result result;
Response(Result result) {
status = Status.SUCCESS.toString();
date = ZonedDateTime.now().format(ServletUtil.DATE_FMT);
this.result = result;
}
}
}
package gov.usgs.earthquake.nshmp.aws;
import static gov.usgs.earthquake.nshmp.aws.Util.CURVES_FILE;
import static gov.usgs.earthquake.nshmp.www.services.ServletUtil.GSON;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2ClientBuilder;
import com.amazonaws.services.lambda.AWSLambda;
import com.amazonaws.services.lambda.AWSLambdaClientBuilder;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ObjectListing;
import com.google.common.base.Throwables;
import gov.usgs.earthquake.nshmp.aws.Util.LambdaHelper;
import gov.usgs.earthquake.nshmp.internal.Parsing;
import gov.usgs.earthquake.nshmp.internal.Parsing.Delimiter;
import gov.usgs.earthquake.nshmp.www.meta.Metadata;
import gov.usgs.earthquake.nshmp.www.meta.Status;
import gov.usgs.earthquake.nshmp.www.services.ServletUtil;
/**
* AWS Lambda function to read in hazard results from S3 and to create slices of
* return periods of interest.
*
* @see HazardResultSliceLambda
*/
public class HazardResultsSlicerLambda implements RequestStreamHandler {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
private static final AmazonEC2 EC2 = AmazonEC2ClientBuilder.defaultClient();
private static final AWSLambda LAMBDA_CLIENT = AWSLambdaClientBuilder.defaultClient();
private static final String CURVE_SLICE_LAMBDA = System.getenv("CURVE_SLICE_LAMBDA_NAME");
private static final String INSTANCE_STATUS = "terminated";
private static final int MAX_INSTANCE_CHECK = 100;
private static final int INSTANCE_CHECK_TIMEOUT = 10 * 1000;
@Override
public void handleRequest(
InputStream input,
OutputStream output,
Context context) throws IOException {
LambdaHelper lambdaHelper = new LambdaHelper(input, output, context);
String requestBucket = "";
try {
RequestData request = GSON.fromJson(lambdaHelper.requestJson, RequestData.class);
requestBucket = String.format("%s/%s", request.bucket, request.key);
lambdaHelper.logger.log("Request Data: " + GSON.toJson(request) + "\n\n");
checkRequest(request);
checkBucket(request);
Response response = processRequest(lambdaHelper, request);
output.write(GSON.toJson(response, Response.class).getBytes());
} catch (Exception e) {
lambdaHelper.logger.log("\nError: " + Throwables.getStackTraceAsString(e) + "\n\n");
String message = Metadata.errorMessage(requestBucket, e, false);
output.write(message.getBytes());
}
}
private static Response processRequest(
LambdaHelper lambdaHelper,
RequestData request) throws IOException, InterruptedException {
ObjectListing objectListing = S3.listObjects(request.bucket, request.key);
List<CompletableFuture<Void>> futures = new ArrayList<>();
objectListing.getObjectSummaries()
.parallelStream()
.filter(summary -> summary.getKey().endsWith(CURVES_FILE))
.forEach(summary -> {
String name = summary.getKey();
lambdaHelper.logger.log("Reading: " + name + "\n");
try {
futures.add(processCurveFile(request, lambdaHelper, name));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
futures.forEach(CompletableFuture::join);
lambdaHelper.logger.log("Zipping results");
return new Response(request);
}
private static CompletableFuture<Void> processCurveFile(
RequestData request,
LambdaHelper lambdaHelper,
String curvesPath) throws IOException {
return readCurveFile(request, curvesPath)
.thenAcceptAsync(result -> {
checkLambdaResponse(result);
});
}
private static CompletableFuture<InvokeResult> readCurveFile(
RequestData request,
String curvesPath) throws IOException {
List<String> names = Arrays.stream(curvesPath.split("/"))
.collect(Collectors.toList());
names.remove(names.size() - 1);
String key = Parsing.join(names, Delimiter.SLASH);
HazardResultSliceLambda.RequestData lambdaRequest = HazardResultSliceLambda.RequestData
.builder()
.bucket(request.bucket)
.key(key)
.slices(request.slices)
.build();
InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(CURVE_SLICE_LAMBDA)
.withPayload(GSON.toJson(lambdaRequest));
return CompletableFuture.supplyAsync(() -> {
return LAMBDA_CLIENT.invoke(invokeRequest);
});
}
private static void checkRequest(RequestData request) {
if (request.bucket == null) {
throw new RuntimeException("Request does not contain a S3 bucket");
}
if (request.key == null) {
throw new RuntimeException("Request does not contain a S3 key");
}
if (request.slices == null) {
throw new RuntimeException("Request does not contain slices");
}
}
private static void checkBucket(RequestData request) {
if (!S3.doesBucketExistV2(request.bucket)) {
throw new RuntimeException(String.format("S3 bucket [%s] does not exist", request.bucket));
}
}
private static void checkLambdaResponse(InvokeResult result) {
try {
LambdaResponse response = GSON.fromJson(
new String(result.getPayload().array()),
LambdaResponse.class);
if (Status.ERROR.toString().equals(response.status)) {
throw new RuntimeException(response.message);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static class LambdaResponse {
String status;
String message;
}
private static class ZipResultsResponse extends LambdaResponse {
ZipResult result;
ZipRequest request;
private static class ZipRequest {
String bucket;
String key;
}
private static class ZipResult {
String path;
String instanceId;
ZipResult(String path, String instanceId) {
this.path = path;
this.instanceId = instanceId;
}
}
}
private static class RequestData {
String bucket;
String key;
List<Double> slices;
}
private static class Response {
final String status;
final String date;
final RequestData request;
final String outputBucket;
Response(RequestData request) {
status = Status.SUCCESS.toString();
date = ZonedDateTime.now().format(ServletUtil.DATE_FMT);
this.request = request;
this.outputBucket = String.format("%s/%s", request.bucket, request.key);
}
}
}
package gov.usgs.earthquake.nshmp.aws;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class Util {
static final String CURVES_FILE = "curves.csv";
static final String MAP_FILE = "map.csv";
/**
* Parse the Lambda function {@code InputStream} into an {@code JsonObject}.
*/
static class LambdaHelper {
JsonObject requestJson;
Context context;
LambdaLogger logger;
OutputStream output;
LambdaHelper(InputStream input, OutputStream output, Context context)
throws UnsupportedEncodingException {
logger = context.getLogger();
this.context = context;
this.output = output;
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
JsonParser parser = new JsonParser();
requestJson = parser.parse(reader).getAsJsonObject();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment