diff --git a/src/org/opensha2/RateCalc.java b/src/org/opensha2/RateCalc.java new file mode 100644 index 0000000000000000000000000000000000000000..60c74717fe557d2d1a628f0294322f534f447ad3 --- /dev/null +++ b/src/org/opensha2/RateCalc.java @@ -0,0 +1,268 @@ +package org.opensha2; + +import static java.util.concurrent.Executors.newFixedThreadPool; + +import static org.opensha2.internal.TextUtils.NEWLINE; + +import org.opensha2.calc.CalcConfig; +import org.opensha2.calc.EqRate; +import org.opensha2.calc.RateResultHandler; +import org.opensha2.calc.Site; +import org.opensha2.calc.Sites; +import org.opensha2.calc.ThreadCount; +import org.opensha2.eq.model.HazardModel; +import org.opensha2.internal.Logging; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.logging.FileHandler; +import java.util.logging.Logger; + +/** + * Compute earthquake rates or Poisson probabilities from a {@link HazardModel}. + * + * @author Peter Powers + */ +public class RateCalc { + + /** + * Entry point for the calculation of earthquake rates and probabilities. + * + * <p>Computing earthquake rates requires at least 2, and at most 3, + * arguments. At a minimum, the path to a model zip file or directory and the + * site(s) at which to perform calculations must be specified. Under the + * 2-argument scenario, model initialization and calculation configuration + * settings are drawn from the config file that <i>must</i> reside at the root + * of the model directory. Sites may be defined as a string, a CSV file, or a + * GeoJSON file. + * + * <p>To override any default or calculation configuration settings included + * with the model, supply the path to another configuration file as a third + * argument. + * + * <p>Please refer to the nshmp-haz <a + * href="https://github.com/usgs/nshmp-haz/wiki" target="_top">wiki</a> for + * comprehensive descriptions of source models, configuration files, site + * files, and earthquake rate calculations. + * + * @see <a href="https://github.com/usgs/nshmp-haz/wiki/Building-&-Running" + * target="_top"> nshmp-haz wiki</a> + * @see <a href="https://github.com/usgs/nshmp-haz/tree/master/etc/examples" + * target="_top"> example calculations</a> + */ + public static void main(String[] args) { + + /* Delegate to run which has a return value for testing. */ + + Optional<String> status = run(args); + if (status.isPresent()) { + System.err.print(status.get()); + System.exit(1); + } + System.exit(0); + } + + static Optional<String> run(String[] args) { + int argCount = args.length; + + if (argCount < 2 || argCount > 3) { + return Optional.of(USAGE); + } + + try { + Logging.init(); + Logger log = Logger.getLogger(HazardCalc.class.getName()); + Path tempLog = HazardCalc.createTempLog(); + FileHandler fh = new FileHandler(tempLog.getFileName().toString()); + fh.setFormatter(new Logging.ConsoleFormatter()); + log.getParent().addHandler(fh); + + log.info(PROGRAM + ": " + HazardCalc.VERSION); + Path modelPath = Paths.get(args[0]); + HazardModel model = HazardModel.load(modelPath); + + CalcConfig config = model.config(); + if (argCount == 3) { + Path userConfigPath = Paths.get(args[2]); + config = CalcConfig.Builder.copyOf(model.config()) + .extend(CalcConfig.Builder.fromFile(userConfigPath)) + .build(); + } + log.info(config.toString()); + + log.info(""); + Sites sites = HazardCalc.readSites(args[1], config, log); + log.info("Sites: " + sites); + + Path out = calc(model, config, sites, log); + log.info(PROGRAM + ": finished"); + + /* Transfer log and write config, windows requires fh.close() */ + fh.close(); + Files.move(tempLog, out.resolve(PROGRAM + ".log")); + config.write(out); + + return Optional.absent(); + + } catch (Exception e) { + StringBuilder sb = new StringBuilder() + .append(NEWLINE) + .append(PROGRAM + ": error").append(NEWLINE) + .append(" Arguments: ").append(Arrays.toString(args)).append(NEWLINE) + .append(NEWLINE) + .append(Throwables.getStackTraceAsString(e)) + .append(USAGE); + return Optional.of(sb.toString()); + } + } + + /* + * Compute earthquake rates or probabilities using the supplied model, config, + * and sites. Method returns the path to the directory where results were + * written. + * + * Unlike hazard calculations, which spread work out over multiple threads for + * a single calculation, rate calculations are single threaded. Concurrent + * calculations for multiple sites are handled below. + */ + private static Path calc( + HazardModel model, + CalcConfig config, + Sites sites, + Logger log) throws IOException, ExecutionException, InterruptedException { + + ThreadCount threadCount = config.performance.threadCount; + RateResultHandler export = null; + if (threadCount != ThreadCount.ONE) { + ExecutorService poolExecutor = newFixedThreadPool(threadCount.value()); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(poolExecutor); + log.info("Threads: " + ((ThreadPoolExecutor) poolExecutor).getCorePoolSize()); + log.info(PROGRAM + ": calculating ..."); + export = concurrentCalc(model, config, sites, log, executor); + executor.shutdown(); + } else { + log.info("Threads: Running on calling thread"); + log.info(PROGRAM + ": calculating ..."); + export = RateResultHandler.create(config, sites, log); + for (Site site : sites) { + EqRate rate = calc(model, config, site); + export.add(rate); + } + } + export.expire(); + + log.info(String.format( + PROGRAM + ": %s sites completed in %s", + export.resultsProcessed(), export.elapsedTime())); + + return export.outputDir(); + } + + private static RateResultHandler concurrentCalc( + HazardModel model, + CalcConfig config, + Sites sites, + Logger log, + ListeningExecutorService executor) + throws InterruptedException, ExecutionException, IOException { + + RateResultHandler export = RateResultHandler.create(config, sites, log); + + int batchSize = config.output.flushLimit; + int submitted = 0; + List<ListenableFuture<EqRate>> rateFutures = new ArrayList<>(batchSize); + + /* + * Although the approach below may not fully leverage all processors if + * there are one or more longer-running calcs in the batch, processing + * batches of locations to a List preserves submission order; as opposed to + * using FutureCallbacks, which will reorder sites on export. + */ + for (Site site : sites) { + Callable<EqRate> task = EqRate.callable(model, config, site); + rateFutures.add(executor.submit(task)); + submitted++; + + if (submitted == batchSize) { + List<EqRate> rateList = Futures.allAsList(rateFutures).get(); + export.addAll(rateList); + submitted = 0; + rateFutures.clear(); + } + } + List<EqRate> lastBatch = Futures.allAsList(rateFutures).get(); + export.addAll(lastBatch); + + return export; + } + + /** + * Compute earthquake rates at a {@code site} for a {@code model} and + * {@code config}. + * + * <p><b>Note:</b> any model initialization settings in {@code config} will be + * ignored as the supplied model will already have been initialized. + * + * @param model to use + * @param config calculation configuration + * @param site of interest + */ + public static EqRate calc( + HazardModel model, + CalcConfig config, + Site site) { + + return EqRate.create(model, config, site); + } + + private static final String PROGRAM = RateCalc.class.getSimpleName(); + private static final String USAGE_COMMAND = + "java -cp nshmp-haz.jar org.opensha2.RateCalc model sites [config]"; + private static final String USAGE_URL1 = "https://github.com/usgs/nshmp-haz/wiki"; + private static final String USAGE_URL2 = "https://github.com/usgs/nshmp-haz/tree/master/etc"; + private static final String SITE_STRING = "name,lon,lat"; + + private static final String USAGE = new StringBuilder() + .append(NEWLINE) + .append(PROGRAM).append(" [").append(HazardCalc.VERSION).append("]").append(NEWLINE) + .append(NEWLINE) + .append("Usage:").append(NEWLINE) + .append(" ").append(USAGE_COMMAND).append(NEWLINE) + .append(NEWLINE) + .append("Where:").append(NEWLINE) + .append(" 'model' is a model zip file or directory") + .append(NEWLINE) + .append(" 'sites' is either:") + .append(NEWLINE) + .append(" - a string, e.g. ").append(SITE_STRING) + .append(NEWLINE) + .append(" (escape any spaces or enclose string in double-quotes)") + .append(NEWLINE) + .append(" - or a *.csv file or *.geojson file of site data") + .append(NEWLINE) + .append(" 'config' (optional) supplies a calculation configuration") + .append(NEWLINE) + .append(NEWLINE) + .append("For more information, see:").append(NEWLINE) + .append(" ").append(USAGE_URL1).append(NEWLINE) + .append(" ").append(USAGE_URL2).append(NEWLINE) + .append(NEWLINE) + .toString(); + +} diff --git a/src/org/opensha2/calc/EqRate.java b/src/org/opensha2/calc/EqRate.java index c6be872342585f8020cbe8ca7cab29441699da2f..9875440583c747b158f0569bbb8ddec21c316d61 100644 --- a/src/org/opensha2/calc/EqRate.java +++ b/src/org/opensha2/calc/EqRate.java @@ -1,6 +1,10 @@ package org.opensha2.calc; +import static com.google.common.base.Preconditions.checkArgument; + +import org.opensha2.calc.CalcConfig.Rate.Bins; import org.opensha2.data.IntervalArray; +import org.opensha2.data.IntervalArray.Builder; import org.opensha2.data.XySequence; import org.opensha2.eq.model.ClusterSource; import org.opensha2.eq.model.ClusterSourceSet; @@ -14,46 +18,97 @@ import org.opensha2.eq.model.SystemSourceSet; import org.opensha2.geo.Location; import org.opensha2.mfd.Mfds; +import com.google.common.base.Converter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.EnumMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; /** - * General purpose magnitude-frequency distribution data container. CUrrent;y - * implemented for annual rate only. + * General purpose magnitude-frequency distribution (MFD) data container. This + * class makes no distinction between incremental or cumulative MFDs, or whether + * values are stored as annual-rate or poisson probability. * * @author Peter Powers */ public class EqRate { - final XySequence totalMfd; - final Map<SourceType, XySequence> typeMfds; + /** The site of interest. */ + public final Site site; + + /** The total MFD of interest. */ + public final XySequence totalMfd; + + /** The MFDs for each contributing source type. */ + public final Map<SourceType, XySequence> typeMfds; + + private EqRate( + Site site, + XySequence totalMfd, + Map<SourceType, XySequence> typeMfds) { - private EqRate(XySequence totalMfd, Map<SourceType, XySequence> typeMfds) { + this.site = site; this.totalMfd = totalMfd; this.typeMfds = typeMfds; } - - /* - * Developer notes: + + /** + * Create a new earthquake rate data container. * - * TODO the receiver MFD needs to be built by querying hazard model for min-max - * magnitudes; currently it is built using fixed, known values from the 2014 - * COUS model. + * @param model to process + * @param config calculation configuration + * @param site of interest */ - - public static EqRate createIncremental( + public static EqRate create( HazardModel model, - Location location, - double distance) { + CalcConfig config, + Site site) { + CalcConfig.Rate rateConfig = config.rate; + + Bins mBins = rateConfig.bins; IntervalArray modelMfd = IntervalArray.Builder - .withRows(4.7, 9.4, 0.1) + .withRows( + mBins.mMin, + mBins.mMax, + mBins.Δm) .build(); + EqRate rates = createIncremental(model, site, rateConfig.distance, modelMfd); + if (rateConfig.distribution == Distribution.CUMULATIVE) { + rates = toCumulative(rates); + } + if (rateConfig.values == CurveValue.POISSON_PROBABILITY) { + rates = toPoissonProbability(rates, rateConfig.timespan); + } + return rates; + } + + /** + * Wraps {@link #create(HazardModel, CalcConfig, Site)} in a {@link Callable} + * for processing multiple sites concurrently. + * + * @param model to process + * @param config calculation configuration + * @param site of interest + */ + public static Callable<EqRate> callable( + HazardModel model, + CalcConfig config, + Site site) { + + return new RateTask(model, config, site); + } + + private static EqRate createIncremental( + HazardModel model, + Site site, + double distance, + IntervalArray modelMfd) { + /* Initialize SourceType mfd builders. */ Map<SourceType, IntervalArray.Builder> typeMfdBuilders = new EnumMap<>(SourceType.class); for (SourceType type : model.types()) { @@ -62,7 +117,7 @@ public class EqRate { /* Populate builders. */ for (SourceSet<? extends Source> sourceSet : model) { - IntervalArray sourceSetMfd = mfd(sourceSet, location, distance, modelMfd); + IntervalArray sourceSetMfd = mfd(sourceSet, site.location, distance, modelMfd); typeMfdBuilders.get(sourceSet.type()).add(sourceSetMfd); } @@ -71,27 +126,23 @@ public class EqRate { ImmutableMap.Builder<SourceType, XySequence> typeMfds = ImmutableMap.builder(); for (Entry<SourceType, IntervalArray.Builder> entry : typeMfdBuilders.entrySet()) { IntervalArray typeMfd = entry.getValue().build(); + typeMfds.put(entry.getKey(), typeMfd.values()); totalMfd.add(typeMfd); } return new EqRate( + site, totalMfd.build().values(), typeMfds.build()); } /** - * Get the total - * @param model - * @param location - * @param distance - * @return + * Create a new earthquake rate container with cumulative values. + * + * @param incremental rate source to convert */ - public static EqRate createCumulative( - HazardModel model, - Location location, - double distance) { + public static EqRate toCumulative(EqRate incremental) { - EqRate incremental = createIncremental(model, location, distance); XySequence cumulativeTotal = Mfds.toCumulative(incremental.totalMfd); ImmutableMap.Builder<SourceType, XySequence> cumulativeTypes = ImmutableMap.builder(); for (Entry<SourceType, XySequence> entry : incremental.typeMfds.entrySet()) { @@ -100,28 +151,67 @@ public class EqRate { Mfds.toCumulative(entry.getValue())); } return new EqRate( + incremental.site, cumulativeTotal, cumulativeTypes.build()); } - static EqRate combine(EqRate... eqRates) { - // validate xs; or not if only for internal use where we know the receiver - // mfds are all sources from the same model + /** + * Create a new earthquake rate container with Poisson probability values. + * + * @param annualRates source to convert + * @param timespan of interest for annual rate to Poisson probability + * conversion + */ + public static EqRate toPoissonProbability(EqRate annualRates, double timespan) { + Converter<Double, Double> converter = Mfds.annualRateToProbabilityConverter(timespan); + XySequence totalMfd = XySequence + .copyOf(annualRates.totalMfd) + .transform(converter); + EnumMap<SourceType, XySequence> typeMfds = new EnumMap<>(SourceType.class); + for (Entry<SourceType, XySequence> entry : annualRates.typeMfds.entrySet()) { + typeMfds.put( + entry.getKey(), + XySequence + .copyOf(entry.getValue()) + .transform(converter)); + } + return new EqRate( + annualRates.site, + XySequence.immutableCopyOf(totalMfd), + Maps.immutableEnumMap(typeMfds)); + } - XySequence totalMfd = XySequence.emptyCopyOf(eqRates[0].totalMfd); + /** + * Create a new earthquake rate container with the sum of the supplied + * {@code rates}. + * + * <p><b>NOTE:</b> This operation is additive and will produce meaningless + * results if {@code rates} have already been converted to + * {@link #toPoissonProbability(EqRate, double) probabilities}, or are not all + * of {@link Distribution#INCREMENTAL} or {@link Distribution#CUMULATIVE} + * distribution format. + * + * <p>Buyer beware. + * + * @param rates to combine + */ + public static EqRate combine(EqRate... rates) { + Site referenceSite = rates[0].site; + XySequence totalMfd = XySequence.emptyCopyOf(rates[0].totalMfd); EnumMap<SourceType, XySequence> typeMfds = new EnumMap<>(SourceType.class); - for (EqRate eqRate : eqRates) { - totalMfd.add(eqRate.totalMfd); - for (Entry<SourceType, XySequence> entry : eqRate.typeMfds.entrySet()) { - SourceType type = entry.getKey(); - if (!typeMfds.containsKey(type)) { - XySequence typeMfd = XySequence.emptyCopyOf(totalMfd); - typeMfds.put(type, typeMfd); - } - typeMfds.get(type).add(entry.getValue()); + for (EqRate rate : rates) { + checkArgument( + rate.site.location.equals(referenceSite.location), + "Site locations are not the same:\n\ts1: %s\n\ts2: %s", + referenceSite, rate.site); + totalMfd.add(rate.totalMfd); + for (Entry<SourceType, XySequence> entry : rate.typeMfds.entrySet()) { + entry.getValue().addToMap(entry.getKey(), typeMfds); } } return new EqRate( + referenceSite, XySequence.immutableCopyOf(totalMfd), Maps.immutableEnumMap(typeMfds)); } @@ -163,11 +253,14 @@ public class EqRate { sourceSetMfd.addEach(mfd); } } - return sourceSetMfd.build(); + return sourceSetMfd.multiply(sourceSet.weight()).build(); } /* * Special case ClusterSourceSet. + * + * Nested fault rates are in fact weights that need to be scaled by the + * cluster rate. */ private static IntervalArray clusterMfd( ClusterSourceSet sourceSet, @@ -178,13 +271,16 @@ public class EqRate { IntervalArray.Builder sourceSetMfd = IntervalArray.Builder.fromModel(modelMfd); for (Source source : sourceSet.iterableForLocation(location, distance)) { ClusterSource clusterSource = (ClusterSource) source; - sourceSetMfd.add(faultMfd( - clusterSource.faults(), - location, - distance, - modelMfd)); + IntervalArray.Builder faultMfd = Builder + .copyOf(faultMfd( + clusterSource.faults(), + location, + distance, + modelMfd)) + .multiply(clusterSource.rate()); + sourceSetMfd.add(faultMfd.build()); } - return sourceSetMfd.build(); + return sourceSetMfd.multiply(sourceSet.weight()).build(); } /* @@ -219,7 +315,29 @@ public class EqRate { } } } - return sourceSetMfd.build(); + return sourceSetMfd.multiply(sourceSet.weight()).build(); + } + + private static final class RateTask implements Callable<EqRate> { + + private final HazardModel model; + private final CalcConfig config; + private final Site site; + + RateTask( + HazardModel model, + CalcConfig config, + Site site) { + + this.model = model; + this.config = config; + this.site = site; + } + + @Override + public EqRate call() throws Exception { + return create(model, config, site); + } } } diff --git a/src/org/opensha2/calc/RateResultHandler.java b/src/org/opensha2/calc/RateResultHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..23c0ac2d50c0176de9a31473f856b66429a2a72d --- /dev/null +++ b/src/org/opensha2/calc/RateResultHandler.java @@ -0,0 +1,272 @@ +package org.opensha2.calc; + +import static com.google.common.base.Preconditions.checkState; +import static java.nio.charset.StandardCharsets.US_ASCII; + +import org.opensha2.data.XySequence; +import org.opensha2.eq.model.SourceType; +import org.opensha2.geo.Location; +import org.opensha2.internal.Parsing; +import org.opensha2.internal.Parsing.Delimiter; + +import com.google.common.base.Function; +import com.google.common.base.Stopwatch; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Doubles; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.logging.Logger; + +/** + * Earthquake rate and probability exporter. + * + * @author Peter Powers + */ +public final class RateResultHandler { + + private final Logger log; + private final Path dir; + private final CalcConfig config; + private final boolean exportSource; + + private final Stopwatch batchWatch; + private final Stopwatch totalWatch; + private int batchCount = 0; + private int resultCount = 0; + + private final boolean namedSites; + private boolean firstBatch = true; + private boolean used = false; + + private final List<EqRate> rates; + + private RateResultHandler(CalcConfig config, Sites sites, Logger log) throws IOException { + this.log = log; + // System.out.println(config.output.directory); + // System.out.println(config.output.directory.toString()); + // System.out.println(config.output.directory.); + this.dir = ResultHandler.createOutputDir(improvedOutputDirectory(config)); + // ResultHandler.createOutputDir(config.output.directory.equals(CalcConfig.DEFAULT_OUT) + // ? () + // : config.output.directory); + this.config = config; + this.exportSource = config.output.curveTypes.contains(CurveType.SOURCE); + this.rates = new ArrayList<>(); + + Site demoSite = sites.iterator().next(); + this.namedSites = demoSite.name() != Site.NO_NAME; + + this.batchWatch = Stopwatch.createStarted(); + this.totalWatch = Stopwatch.createStarted(); + } + + /* + * If output from config is 'curves', change to the more appropriate 'eq-rate' + * or 'eq-prob'. + */ + static Path improvedOutputDirectory(CalcConfig config) { + Path out = config.output.directory; + if (out.toString().equals(CalcConfig.DEFAULT_OUT)) { + return (config.rate.values == CurveValue.POISSON_PROBABILITY) ? Paths.get("eq-prob") + : Paths.get("eq-rate"); + } + return out; + } + + /** + * Create a new results handler. + * + * @param config that specifies output options and formats + * @param sites reference to the sites to be processed (not retained) + * @param log shared logging instance from calling class + * @throws IllegalStateException if binary output has been specified in the + * {@code config} but the {@code sites} container does not specify map + * extents. + */ + public static RateResultHandler create( + CalcConfig config, + Sites sites, + Logger log) throws IOException { + + return new RateResultHandler(config, sites, log); + } + + /** + * Add the supplied {@code EqRate}s to this exporter. + * + * @param rates data containers to add + */ + public void addAll(Collection<EqRate> rates) throws IOException { + for (EqRate rate : rates) { + add(rate); + } + } + + /** + * Add an {@code EqRate} to this exporter. + * + * @param rate data container to add + */ + public void add(EqRate rate) throws IOException { + checkState(!used, "This result handler is expired"); + resultCount++; + rates.add(rate); + if (rates.size() == config.output.flushLimit) { + flush(); + batchCount++; + log.info(String.format( + " batch: %s in %s – %s sites in %s", + batchCount, batchWatch, resultCount, totalWatch)); + batchWatch.reset().start(); + } + } + + /** + * Flushes any remaining results, stops all timers and sets the state of this + * exporter to 'used'; no more results may be added. + */ + public void expire() throws IOException { + flush(); + batchWatch.stop(); + totalWatch.stop(); + used = true; + } + + /* + * Flush any stored Hazard and Deaggregation results to file, clearing + */ + private void flush() throws IOException { + if (!rates.isEmpty()) { + writeRates(); + rates.clear(); + firstBatch = false; + } + } + + /** + * The number of hazard [and deagg] results passed to this handler thus far. + */ + public int resultsProcessed() { + return resultCount; + } + + /** + * The number of {@code Hazard} results this handler is currently storing. + */ + public int size() { + return rates.size(); + } + + /** + * A string representation of the time duration that this result handler has + * been running. + */ + public String elapsedTime() { + return totalWatch.toString(); + } + + /** + * The target output directory established by this handler. + */ + public Path outputDir() { + return dir; + } + + /* + * Write the current list of {@code EqRate}s to file. + */ + private void writeRates() throws IOException { + + EqRate demo = rates.get(0); + + Iterable<Double> emptyValues = Doubles.asList(new double[demo.totalMfd.size()]); + + OpenOption[] options = firstBatch ? ResultHandler.WRITE : ResultHandler.APPEND; + + Function<Double, String> formatter = Parsing.formatDoubleFunction(ResultHandler.RATE_FMT); + + /* Line maps for ascii output; may or may not be used */ + List<String> totalLines = new ArrayList<>(); + Map<SourceType, List<String>> typeLines = Maps.newEnumMap(SourceType.class); + + if (firstBatch) { + Iterable<?> header = Iterables.concat( + Lists.newArrayList(namedSites ? "name" : null, "lon", "lat"), + demo.totalMfd.xValues()); + totalLines.add(Parsing.join(header, Delimiter.COMMA)); + } + + if (exportSource) { + // TODO get source types from model + for (SourceType type : SourceType.values()) { + typeLines.put(type, Lists.newArrayList(totalLines)); + } + } + + /* Process batch */ + for (EqRate rate : rates) { + + String name = namedSites ? rate.site.name : null; + Location location = rate.site.location; + + List<String> locData = Lists.newArrayList( + name, + String.format("%.5f", location.lon()), + String.format("%.5f", location.lat())); + + String line = toLine(locData, rate.totalMfd.yValues(), formatter); + totalLines.add(line); + + String emptyLine = toLine(locData, emptyValues, formatter); + + if (exportSource) { + for (Entry<SourceType, List<String>> entry : typeLines.entrySet()) { + SourceType type = entry.getKey(); + String typeLine = emptyLine; + if (rate.typeMfds.containsKey(type)) { + XySequence typeRate = rate.typeMfds.get(type); + typeLine = toLine(locData, typeRate.yValues(), formatter); + } + entry.getValue().add(typeLine); + } + } + + /* write/append */ + Path totalFile = dir.resolve("total" + ResultHandler.TEXT_SUFFIX); + Files.write(totalFile, totalLines, US_ASCII, options); + if (exportSource) { + Path typeDir = dir.resolve("source"); + Files.createDirectories(typeDir); + for (Entry<SourceType, List<String>> typeEntry : typeLines.entrySet()) { + SourceType type = typeEntry.getKey(); + String filename = type.toString(); + Path typeFile = typeDir.resolve(filename + ResultHandler.TEXT_SUFFIX); + Files.write(typeFile, typeEntry.getValue(), US_ASCII, options); + } + } + } + } + + private static String toLine( + Iterable<String> location, + Iterable<Double> values, + Function<Double, String> formatter) { + + return Parsing.join( + FluentIterable.from(location).append(Iterables.transform(values, formatter)), + Delimiter.COMMA); + } + +}