Skip to content
Snippets Groups Projects
efield.py 2.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • Cain, Payton David's avatar
    Cain, Payton David committed
    import typer
    
    from ..adjusted import AdjustedMatrix
    from .derived import adjusted
    from .factory import get_edge_factory
    
    from .filters import minute_filter
    
    Cain, Payton David's avatar
    Cain, Payton David committed
    
    app = typer.Typer()
    
    
    def main():
        app()
    
    
    @app.command(name="realtime")
    def efield_realtime(
        observatory: str = "BOU",
        host: str = "127.0.0.1",
        realtime_interval: int = 600,
        update_limit: int = 10,
    ):
        """
        inverts polarity of 1Hz E-E/E-N
        filters 1Hz inverted/non-inverted E-E/E-N to 1 minute
        """
        adjusted(
            observatory=observatory,
            interval="second",
            input_factory=get_edge_factory(host=host, data_type="variation"),
            input_channels=["E-E", "E-N"],
            output_factory=get_edge_factory(host=host, data_type="adjusted"),
            output_channels=["E-E", "E-N"],
            matrix=AdjustedMatrix(
                matrix=[
                    [-1, 0, 0],
                    [0, -1, 0],
                    [0, 0, 1],
                ],
            ),
            realtime_interval=realtime_interval,
            update_limit=update_limit,
        )
    
    Cain, Payton David's avatar
    Cain, Payton David committed
            observatory=observatory,
            channels=["E-E", "E-N"],
            input_factory=get_edge_factory(host=host, data_type="variation"),
            output_factory=get_edge_factory(host=host, data_type="variation"),
            realtime_interval=realtime_interval,
            update_limit=update_limit,
        )
    
    Cain, Payton David's avatar
    Cain, Payton David committed
            observatory=observatory,
            channels=["E-E", "E-N"],
            input_factory=get_edge_factory(host=host, data_type="adjusted"),
            output_factory=get_edge_factory(host=host, data_type="adjusted"),
            realtime_interval=realtime_interval,
            update_limit=update_limit,
        )
    
    
    @app.command(name="hour")
    def efield_hour(
        observatory: str = "BOU",
        host: str = "127.0.0.1",
        realtime_interval: int = 600,
        update_limit: int = 10,
    ):
        """filters 1 minute inverted/non-inverted E-E/E-N to 1 hour"""
        raise NotImplementedError("hour not implemented")
    
    
    @app.command(name="day")
    def efield_day(
        observatory: str = "BOU",
        host: str = "127.0.0.1",
        realtime_interval: int = 600,
        update_limit: int = 10,
    ):
        """filters 1 minute inverted/non-inverted E-E/E-N to 1 day"""
        raise NotImplementedError("day not implemented")