Commit 2b08f6b0 authored by Domanski, Marian M.'s avatar Domanski, Marian M.

implement results recorder classes

parent 63735344
......@@ -3,10 +3,11 @@ import copy
import datetime
import os
import h5py
import numpy as np
from fluegg.resultsrecorder import QuantileResultsRecorder
class Results(ABC):
......@@ -46,7 +47,7 @@ class Results(ABC):
time : float
Simulation time, in seconds
depth : float
Fractional position along the z-axis, where 0 is the
Fractional position along the z-axis, where 0 is the
water surface and 1 is the bottom of the channel.
bins : int or array_like, optional
Number of bins or array of bin edges. The default is 20.
......@@ -240,7 +241,7 @@ class FullResults(Results):
Parameters
----------
results : SimulationResults
results : FullResultsRecorder
"""
......@@ -397,7 +398,7 @@ class QuantileResults(Results):
Parameters
----------
results : SimulationResults or FullResults
results : ResultsRecorder,
"""
......@@ -412,13 +413,16 @@ class QuantileResults(Results):
super().__init__(results)
positions = results.positions((0, 1, 2))
# calculate quantile values to be evaluated
lo = np.insert(np.logspace(-3, 0, 50) / 2, 0, 0)
hi = 1 - lo[:-1]
self._quantiles = np.sort(np.append(lo, hi))
self._positions = np.quantile(positions, self._quantiles, axis=1)
# if an AttributeError is raised, assume results is not a
# QuantileResultsRecorder
try:
self._quantiles = results.quantiles()
self._positions = results.positions()
except AttributeError:
# use same quantiles as QuantileResultsRecorder
self._quantiles = QuantileResultsRecorder.quantiles()
positions = results.positions((0, 1, 2))
self._positions = np.quantile(positions, self._quantiles, axis=1)
def position_cfrac(self, time, position_axis=0):
"""Cumulative fraction of particles past positions at a
......
from abc import ABC, abstractmethod
import numpy as np
from fluegg.results import FullResults, QuantileResults
class ResultsRecorder(ABC):
def __init__(self, simclock, particles, configuration, hydraulic_model):
self._configuration = configuration
self._time_series = simclock.time_array()
hydraulic_cell_edges = hydraulic_model.cell_edges()
self._domain_length = hydraulic_cell_edges[-1]
self._positions = None
def configuration(self):
"""Returns the configuration dictionary.
Returns
-------
dict
"""
return self._configuration
def domain_length(self):
"""Returns the length of the simulation domain
Returns the longitudinal (x-axis) length of the simulation domain.
Returns
-------
float
"""
return self._domain_length
def positions(self, *args, **kwargs):
"""Returns particle position array
Returns
-------
numpy.ndarray
Position array
"""
return self._positions.copy()
@abstractmethod
def record_result(self, simclock, particles, hydraulic_results):
pass
@abstractmethod
def results(self):
pass
def time(self, index):
"""Simulation time
Parameters
----------
index : int or slice
Index of time to return
Returns
-------
float
Simulation time, in seconds
"""
return self._time_series[index]
class FullResultsRecorder(ResultsRecorder):
"""Data structure containing simulation results during a simulation run
Parameters
----------
simclock : SimulationClock
Representation of a simulation clock
particles : DriftingParticle
Particles used during simulation
configuration : dict
Simulation configuration
hydraulic_model : SeriesOfHydraulicCells
Hydraulic model
"""
def __init__(self, simclock, particles, configuration, hydraulic_model):
super().__init__(simclock, particles, configuration, hydraulic_model)
self._positions = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], 3))
self._depth = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], ))
self._width = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], ))
def depth(self):
""" returns depth array
"""
return self._depth
def positions(self, normalize=()):
"""Returns particle position array
Parameters
----------
normalize : tuple of int, optional
Normalize position array axes (the default is an empty,
tuple, which doesn't normalize any axes).
Returns
-------
numpy.ndarray
Position array
"""
axes = {0, 1, 2}
# keep these axes in absolute length
absolute = axes.difference(normalize)
positions = np.zeros_like(self._positions)
for a in normalize:
positions[:, :, a] = self._normalize_axis(a)
for a in absolute:
positions[:, :, a] = self._positions[:, :, a]
return positions
def record_result(self, simclock, particles, hydraulic_results):
"""Records results for current time step
Parameters
----------
simclock : SimulationClock
particles : DriftingParticle
hydraulic_results : HydraulicResults
"""
time_index = simclock.current_time_index()
self._positions[time_index] = particles.position()
self._depth[time_index] = hydraulic_results.depth()
self._width[time_index] = hydraulic_results.width()
def results(self):
"""Returns results
Returns
-------
results : FullResults
"""
return FullResults(self)
def width(self):
""" returns depth array
"""
return self._width
class QuantileResultsRecorder(ResultsRecorder):
def __init__(self, simclock, particles, configuration, hydraulic_model):
super().__init__(simclock, particles, configuration, hydraulic_model)
self._quantiles = self.quantiles()
self._positions = np.tile(
np.nan, (simclock.number_of_times(), self._quantiles.shape[0], 3))
@staticmethod
def quantiles():
"""Returns the quantiles
"""
lo = np.insert(np.logspace(-3, 0, 50) / 2, 0, 0)
hi = 1 - lo[:-1]
return np.sort(np.append(lo, hi))
def record_result(self, simclock, particles, hydraulic_results):
"""Records results for current time step
Parameters
----------
simclock : SimulationClock
particles : DriftingParticle
hydraulic_results : HydraulicResults
"""
time_index = simclock.current_time_index()
current_position = particles.position()
normalized_positions = np.empty_like(current_position)
depth = hydraulic_results.depth()
width = hydraulic_results.width()
normalized_positions[time_index, :,
0] = current_position[:, 0] / self._domain_length
normalized_positions[time_index, :, 1] = current_position[:, 1] / width
normalized_positions[time_index, :, 2] = current_position[:, 2] / depth
self._positions[time_index, :, :] = np.quantile(
normalized_positions, self._quantiles, axis=1)
def results(self):
"""Returns quantile results
Returns
-------
QuantileResults
"""
return QuantileResults(self)
......@@ -28,14 +28,13 @@ class Simulation:
Parameters
----------
particles : fluegg.drift.DriftingParticles
particles : DriftingParticle
Particles being drifted through the simulation
transporter : fluegg.transporter.Transporter
transporter : Transporter
Class that physically transports each egg for each time step
simclock : fluegg.simclock.SimulationClock
simclock : SimulationClock
Clock that keeps track of the time during the simulation
hydraulic_cells : SeriesOfHydraulicCells
"""
......@@ -54,7 +53,9 @@ class Simulation:
fun(*args)
def _verify_time_step(self):
"""Wrapper for verify_time_step_copy() which is called on a copy of the simulation
"""Wrapper for verify_time_step_copy() which is called on a
copy of the simulation
Raises an error if the time step input by the user is too large.
"""
# Creates a deep copy of the simulation so that
......@@ -72,7 +73,8 @@ class Simulation:
# Raise error if time step is too large
if user_step > max_step:
raise ValueError(
'User time step is {}. Must be at most {}.'.format(user_step, max_step))
'User time step is {}. Must be at most {}.'
.format(user_step, max_step))
def add_time_step_function_call(self, fun, args):
"""Adds a function that will be called at the beginning of a time step.
......@@ -213,142 +215,6 @@ def from_input_dict(d):
return sim
class SimulationResults:
"""Data structure containing simulation results during a simulation run
Parameters
----------
simclock : SimulationClock
Representation of a simulation clock
particles : DriftingParticle
Particles used during simulation
configuration : dict
Simulation configuration
hydraulic_model : SeriesOfHydraulicCells
Hydraulic model
"""
def __init__(self, simclock, particles, configuration, hydraulic_model):
self._positions = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], 3))
self._depth = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], ))
self._width = np.tile(
np.nan, (simclock.number_of_times(),
particles.position().shape[0], ))
self._configuration = configuration
self._time_series = simclock.time_array()
hydraulic_cell_edges = hydraulic_model.cell_edges()
self._domain_length = hydraulic_cell_edges[-1]
def configuration(self):
"""Returns the configuration dictionary.
Returns
-------
dict
"""
return self._configuration
def depth(self):
""" returns depth array
"""
return self._depth
def domain_length(self):
"""Returns the length of the simulation domain
Returns the longitudinal (x-axis) length of the simulation domain.
Returns
-------
float
"""
return self._domain_length
def positions(self, normalize=()):
"""Returns particle position array
Parameters
----------
normalize : tuple of int, optional
Normalize position array axes (the default is an empty,
tuple, which doesn't normalize any axes).
Returns
-------
numpy.ndarray
Position array
"""
axes = {0, 1, 2}
# keep these axes in absolute length
absolute = axes.difference(normalize)
positions = np.zeros_like(self._positions)
for a in normalize:
if a == 0:
abs_length = self._domain_length
elif a == 1:
abs_length = self._width
elif a == 2:
abs_length = self._depth
positions[:, :, a] = self._positions[:, :, a] / abs_length
for a in absolute:
positions[:, :, a] = self._positions[:, :, a]
return positions
def record_result(self, simclock, particles, hydraulic_results):
"""Records results for current time step
Parameters
----------
simclock : SimulationClock
particles : DriftingParticle
hydraulic_results : HydraulicResults
"""
time_index = simclock.current_time_index()
self._positions[time_index] = particles.position()
self._depth[time_index] = hydraulic_results.depth()
self._width[time_index] = hydraulic_results.width()
def time(self, index):
"""Simulation time
Parameters
----------
index : int or slice
Index of time to return
Returns
-------
float
Simulation time, in seconds
"""
return self._time_series[index]
def width(self):
""" returns depth array
"""
return self._width
def input_dict_validator(d):
"""Validates a list of simulation inputs and runs the simlation if inputs
are valid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment