Newer
Older
#! /usr/bin/env python
"""Monitor """
import sys
from typing import List
import argparse
import sys
from obspy.core import UTCDateTime
from .. import TimeseriesUtility, edge
def calculate_warning_threshold(warning_threshold, interval):
"""Calculate warning_threshold for the giving interval
Parameters
----------
warning_threshold: int
the warning_threshold from the command line.
interval: string
the interval being warned against
"""
warning_threshold *= 60
warning_threshold *= 3600
return warning_threshold
def calculate_gap_percentage(total, trace):
"""Calculate the percentage of missing values
Parameters
----------
total: int
Total number of missing values
trace: obspy.core.Trace
a stream containing a single channel of data
"""
return (float(total) / float(trace.stats.npts)) * 100.0, trace.stats.npts
def format_time(date):
"""Print UTCDateTime in YYYY-MM-DD HH:MM:SS format
Parameters
----------
date: UTCDateTime
"""
return date.datetime.strftime("%Y-%m-%d %H:%M:%S")
def get_gaps(gaps):
"""Print gaps for a given channel into a html string.
gaps: array
Array of gaps
"""
if len(gaps):
for i in range(len(gaps)):
if i < 10:
gap_string += " %s to %s <br>\n" % (
format_time(gaps[i][0]),
format_time(gaps[i][1]),
)
else:
if i == 10:
gap_string += "<details>\n"
gap_string += f"<summary>+ {len(gaps) - 10}</summary>\n"
gap_string += "<span>\n"
else:
gap_string += " %s to %s <br>\n" % (
format_time(gaps[i][0]),
format_time(gaps[i][1]),
)
if i == len(gaps) - 1:
gap_string += "</span>\n"
gap_string += "</details>\n"
else:
gap_string = " None<br>"
return gap_string
def get_gap_total(gaps, interval):
"""Get total length of time for all gaps in a channel
Parameters
----------
gaps: array
Array of gaps
interval: string
the interval being warned against
"""
total = 0
divisor = 1
divisor = 60
for gap in gaps:
total += int(gap[2] - gap[0]) / divisor
return total
def get_last_time(gaps, endtime):
"""Return the last time that a channel has in it.
Parameters
----------
gaps: array
Array of gaps
endtime: UTCDateTime
The endtime specified in the arguments
"""
length = len(gaps) - 1
if length > -1 and gaps[length][2] >= endtime:
return gaps[length][0]
else:
return endtime
def get_table_header():
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
return (
'<table style="border-collapse: collapse;">\n'
+ "<thead>\n"
+ "<tr>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "</th>\n"
+ "<th colspan=3 "
+ 'style="border:1px solid black; padding: 2px;">'
+ "Gap</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "</th>\n"
+ "</tr>\n"
+ "<tr>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Channel</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Last Time Value</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Count</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Total Time</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Percentage</th>\n"
+ '<th style="border:1px solid black; padding: 2px;">'
+ "Total Values</th>\n"
+ "</tr>\n"
+ "</thead>\n"
+ "<tbody>\n"
)
def has_gaps(gaps):
"""Returns True if gaps dictionary has gaps in it.
Parameters
----------
gaps: dictionary
Dictionary of Channel:gaps arrays
"""
for channel in gaps:
if len(gaps[channel]):
return True
return False
def print_html_header(starttime, endtime, title):
"""Prints the html header, and title
Parameters
----------
starttime: UTCDateTime
The starttime of the data we are analyzing
endtime: UTCDateTime
The endtime of the data we are analyzing
title: string
The title passed in by the user
"""
print(
"<!DOCTYPE html>\n"
+ "<html>\n"
+ "<head>\n"
+ "<title> %s \n to %s \n</title>"
% (format_time(starttime), format_time(endtime))
+ "</head>\n"
+ "<body>\n"
+ '<style type="text/css">\n'
+ "table {border-collapse: collapse;}\n"
+ "th {border:1px solid black; padding: 2px;}\n"
+ "td {text-align:center;}\n"
+ "</style>\n"
+ title
+ "<br>\n"
"%s to %s " % (format_time(starttime), format_time(endtime))
)
def print_observatories(
starttime: UTCDateTime,
endtime: UTCDateTime,
observatories: List[str],
edge_host: str,
channels: List[str],
data_type: str,
gaps_only: bool,
intervals: List[str],
location_code: str,
warning_threshold: int,
):
"""Print all the observatories
Returns
-------
Boolean: if a warning was issued.
"""
intervals = intervals
channels = channels
starttime = starttime
endtime = endtime
host = edge_host
table_header = get_table_header()
warning_issued = False
table_end = "</tbody>\n" + "</table>\n"
for observatory in observatories:
summary_table = ""
gap_details = ""
print_it = False
summary_header = "<p>Observatory: %s </p>\n" % observatory
summary_table += table_header
for interval in intervals:
factory = edge.EdgeFactory(
host=host,
port=2060,
observatory=observatory,
type=data_type,
locationCode=location_code,
interval=interval,
)
timeseries = factory.get_timeseries(starttime=starttime, endtime=endtime)
gaps = TimeseriesUtility.get_stream_gaps(timeseries)
if gaps_only and not has_gaps(gaps):
continue
else:
print_it = True
warning_threshold = calculate_warning_threshold(warning_threshold, interval)
summary_table += '<td style="text-align:center;">'
summary_table += " %sS \n </td></tr>\n" % interval.upper()
gap_details += " %sS <br>\n" % interval.upper()
for channel in channels:
gap = gaps[channel]
trace = timeseries.select(channel=channel)[0]
total = get_gap_total(gap, interval)
percentage, count = calculate_gap_percentage(total, trace)
last = get_last_time(gap, endtime)
summary_table += "<tr>\n"
summary_table += '<td style="text-align:center;">%s</td>' % channel
summary_table += '<td style="text-align:center;">%s</td>' % format_time(
last
)
summary_table += '<td style="text-align:center;">%d</td>' % len(gap)
summary_table += '<td style="text-align:center;">%d %s</td>' % (
total,
interval,
)
summary_table += (
'<td style="text-align:center;">%0.2f%%</td>' % percentage
)
summary_table += '<td style="text-align:center;">%d</td>' % count
summary_table += "</tr>\n"
if endtime - last > warning_threshold:
warning_issued = True
# Gap Detail
gap_details += " Channel: %s <br>\n" % channel
gap_details += get_gaps(gap) + "\n"
if len(warning):
summary_header += (
"Warning: Channels older then "
+ "warning-threshold "
+ "%s %ss<br>\n" % (warning, interval)
)
summary_table += table_end
if print_it:
print(summary_header)
print(summary_table)
print(gap_details)
return warning_issued
def generate_report(
starttime: UTCDateTime,
endtime: UTCDateTime,
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
observatories: List[str],
edge_host: str = "127.0.0.1",
channels: List[str] = ["H", "E", "Z", "F"],
data_type: str = "variation",
gaps_only: bool = True,
intervals: List[str] = ("minute",),
location_code: str = "R0",
title: str = "",
warning_threshold: int = 60,
):
print_html_header(starttime=starttime, endtime=endtime, title=title)
warning_issued = print_observatories(
starttime=starttime,
endtime=endtime,
observatories=observatories,
edge_host=edge_host,
channels=channels,
data_type=data_type,
gaps_only=gaps_only,
intervals=intervals,
location_code=location_code,
warning_threshold=warning_threshold,
)
print("</body>\n" + "</html>\n")
sys.exit(warning_issued)
"""command line tool for building geomag monitoring reports
Inputs
------
use monitor.py --help to see inputs, or see parse_args.
Notes
-----
parses command line options using argparse
Output is in HTML.
"""
if args is None:
args = parse_args(sys.argv[1:])
generate_report(
starttime=args.starttime,
endtime=args.endtime,
observatories=args.observatories,
edge_host=args.edge_host,
channels=args.channels,
data_type=args.type,
gaps_only=args.gaps_only,
intervals=args.intervals,
location_code=args.locationcode,
title=args.title,
warning_threshold=args.warning_threshold,
)
def parse_args(args):
"""parse input arguments
Parameters
----------
args : list of strings
Returns
-------
argparse.Namespace
dictionary like object containing arguments.
"""
parser = argparse.ArgumentParser(
description="Use @ to read commands from a file.", fromfile_prefix_chars="@"
)
parser.add_argument(
"--starttime",
required=True,
default=None,
help="UTC date YYYY-MM-DD HH:MM:SS",
)
parser.add_argument(
"--endtime",
required=True,
default=None,
help="UTC date YYYY-MM-DD HH:MM:SS",
)
parser.add_argument(
"--edge-host",
required=False,
default="127.0.0.1",
help="IP/URL for edge connection",
)
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
parser.add_argument(
"--observatories",
required=True,
nargs="*",
help="Observatory code ie BOU, CMO, etc",
)
parser.add_argument(
"--channels",
nargs="*",
default=["H", "E", "Z", "F"],
help="Channels H, E, Z, etc",
)
parser.add_argument(
"--intervals",
nargs="*",
default=["minute"],
choices=["hourly", "minute", "second"],
)
parser.add_argument(
"--locationcode", default="R0", choices=["R0", "R1", "RM", "Q0", "D0", "C0"]
)
parser.add_argument(
"--type",
default="variation",
choices=["variation", "quasi-definitive", "definitive"],
)
parser.add_argument(
"--warning-threshold",
type=int,
default=60,
help="How many time slices should pass before a warning is issued",
)
parser.add_argument(
"--gaps-only",
action="store_true",
default=True,
help="Only print Observatories with gaps.",
)
parser.add_argument("--title", default="", help="Title for the top of the report")
return parser.parse_args(args)