"""Convert streams from one coordinate system to another. We use three coordinate systems. Geo: Based on Geographic North. X, Y, Z, F X is north, Y is east, Z is down Obs: Based on the observatories orientaion. H, E, Z, F, d0 Mag: Based on Magnetic North. H, D, Z, F """ import numpy import obspy.core import ChannelConverter def get_geo_from_mag(mag): """Convert a stream to geographic coordinate system. Parameters ---------- stream : obspy.core.Stream stream containing observatory components H, D, Z, and F. Returns ------- obspy.core.Stream new stream object containing geographic components X, Y, Z, and F. """ h = mag.select(channel='H')[0] d = mag.select(channel='D')[0] z = mag.select(channel='Z')[0] f = mag.select(channel='F')[0] mag_h = h.data mag_d = d.data (geo_x, geo_y) = ChannelConverter.get_geo_from_mag(mag_h, mag_d) return obspy.core.Stream(( __get_trace('X', h.stats, geo_x), __get_trace('Y', d.stats, geo_y), z, f)) def get_geo_from_obs(obs): """Convert a stream to geographic coordinate system. Parameters ---------- stream : obspy.core.Stream stream containing observatory components H, D or E, Z, and F. Returns ------- obspy.core.Stream new stream object containing geographic components X, Y, Z, and F. """ return get_geo_from_mag(get_mag_from_obs(obs)) def get_deltaf_from_geo(geo): """Get deltaf given geographic coordinate values Parameters ---------- obs: obspy.core.Stream stream containing the observatory components H, D or E, Z, and F. Returns ------- obspy.core.Stream stream object containing delta f values """ x = geo.select(channel='X')[0] y = geo.select(channel='Y')[0] z = geo.select(channel='Z')[0] fs = geo.select(channel='F')[0] fv = ChannelConverter.get_computed_f_using_squares(x, y, z) G = ChannelConverter.get_deltaf(fv, fs) return obspy.core.Stream(( __get_trace('G', x.stats, G), )) def get_deltaf_from_obs(obs): """Get deltaf given observatory coordinate values Parameters ---------- obs: obspy.core.Stream stream containing the observatory components H, D or E, Z, and F. Returns ------- obspy.core.Stream stream object containing delta f values """ h = obs.select(channel='H')[0] z = obs.select(channel='Z')[0] fs = obs.select(channel='F')[0] e = __get_obs_e_from_obs(obs) fv = ChannelConverter.get_computed_f_using_squares(h, e, z) G = ChannelConverter.get_deltaf(fv, fs) return obspy.core.Stream(( __get_trace('G', h.stats, G), )) def get_mag_from_geo(geo): """Convert a stream to magnetic coordinate system. Parameters ---------- geo: obspy.core.Stream stream containing observatory components X, Y, Z, and F. Returns ------- obspy.core.Stream new stream object containing magnetic components H, D, Z, and F. """ x = geo.select(channel='X')[0] y = geo.select(channel='Y')[0] z = geo.select(channel='Z')[0] f = geo.select(channel='F')[0] geo_x = x.data geo_y = y.data (mag_h, mag_d) = ChannelConverter.get_mag_from_geo(geo_x, geo_y) return obspy.core.Stream(( __get_trace('H', x.stats, mag_h), __get_trace('D', y.stats, mag_d), z, f)) def get_mag_from_obs(obs): """Convert a stream to magnetic coordinate system. Parameters ---------- obs : obspy.core.Stream stream containing observatory components H, D or E, Z, and F. Returns ------- obspy.core.Stream new stream object containing magnetic components H, D, Z, and F. """ h = obs.select(channel='H')[0] e = __get_obs_e_from_obs(obs) z = obs.select(channel='Z')[0] f = obs.select(channel='F')[0] obs_h = h.data obs_e = e.data d0 = ChannelConverter.get_radians_from_minutes( numpy.float64(e.stats.declination_base) / 10) (mag_h, mag_d) = ChannelConverter.get_mag_from_obs(obs_h, obs_e, d0) return obspy.core.Stream(( __get_trace('H', h.stats, mag_h), __get_trace('D', e.stats, mag_d), z, f)) def get_obs_from_geo(geo, include_d=False): """Convert a stream to observatory coordinate system. Parameters ---------- stream : obspy.core.Stream stream containing geographic components X, Y, Z, and F. include_d : boolean whether to also include the observatory D component. Returns ------- obspy.core.Stream new stream object containing observatory components H, D, E, Z, and F. """ return get_obs_from_mag(get_mag_from_geo(geo), include_d) def get_obs_from_mag(mag, include_d=False): """Convert a stream to magnetic observatory coordinate system. Parameters ---------- stream: obspy.core.Stream stream containing magnetic components H, D, Z, and F. include_d: boolean whether to also include the observatory D component Returns ------- obspy.core.Stream new stream object containing observatory components H, D, E, Z, and F """ h = mag.select(channel='H')[0] d = mag.select(channel='D')[0] z = mag.select(channel='Z')[0] f = mag.select(channel='F')[0] mag_h = h.data mag_d = d.data d0 = ChannelConverter.get_radians_from_minutes( numpy.float64(d.stats.declination_base) / 10) (obs_h, obs_e) = ChannelConverter.get_obs_from_mag(mag_h, mag_d, d0) traces = ( __get_trace('H', h.stats, obs_h), __get_trace('E', d.stats, obs_e), z, f) if include_d: obs_d = ChannelConverter.get_obs_d_from_obs(obs_h, obs_e) traces = traces + (__get_trace('D', d.stats, obs_d),) return obspy.core.Stream(traces) def get_obs_from_obs(obs, include_e=False, include_d=False): """Fill in the observatory parameters as requested Parameters ---------- stream: obspy.core.Stream stream containing the observatory components H, D or E, Z, and F. include_e: boolean whether to include the e component include_d: boolean whether to include the d component Returns ------- obspy.core.Stream new stream object containing observatory components H, D, E, Z, and F """ h = obs.select(channel='H')[0] z = obs.select(channel='Z')[0] f = obs.select(channel='F')[0] traces = (h, z, f) if include_d: d = __get_obs_d_from_obs(obs) traces = traces + (d, ) if include_e: e = __get_obs_e_from_obs(obs) traces = traces + (e, ) return obspy.core.Stream(traces) def __get_trace(channel, stats, data): """Utility to create a new trace object. Parameters ---------- channel : str channel name. stats : obspy.core.Stats channel metadata to clone. data : numpy.array channel data. Returns ------- obspy.core.Trace trace containing data and metadata. """ stats = obspy.core.Stats(stats) stats.channel = channel return obspy.core.Trace(data, stats) def __get_obs_d_from_obs(obs): """Get trace containing observatory D component. Returns D if found, otherwise computes D from H, E, D0. Parameters ---------- obs : obspy.core.Stream observatory components (D) or (H, E). Returns ------- obspy.core.Trace observatory component D. """ try: d = obs.select(channel='D')[0] except: h = obs.select(channel='H')[0] e = obs.select(channel='E')[0] d = __get_trace('D', e.stats, ChannelConverter.get_obs_d_from_obs(h.data, e.data)) return d def __get_obs_e_from_obs(obs): """Get trace containing observatory E component. Returns E if found, otherwise computes E from H,D. Parameters ---------- obs : obspy.core.Stream observatory components (E) or (H, D). Returns ------- obspy.core.Trace observatory component E. """ try: e = obs.select(channel='E')[0] except: h = obs.select(channel='H')[0] d = obs.select(channel='D')[0] e = __get_trace('E', d.stats, ChannelConverter.get_obs_e_from_obs(h.data, d.data)) return e