From d6e55009be6949d8ccf9bec9e072317b78cf9b77 Mon Sep 17 00:00:00 2001
From: David Blodgett <dblodgett@usgs.gov>
Date: Wed, 27 Nov 2024 11:40:07 -0600
Subject: [PATCH] first bit of NHD prep for #153 prepare step

---
 workspace/01_NHD_prep.Rmd   |   3 +-
 workspace/R/2_prep.R        |  55 ++++++++++++++++-
 workspace/R/NHD_navigate.R  |   1 +
 workspace/R/config.R        | 119 +++++++++++++++++++-----------------
 workspace/_targets.R        |  32 ++++++++--
 workspace/workflow_runner.R |  35 ++++++++++-
 6 files changed, 181 insertions(+), 64 deletions(-)

diff --git a/workspace/01_NHD_prep.Rmd b/workspace/01_NHD_prep.Rmd
index c21d510..278da78 100644
--- a/workspace/01_NHD_prep.Rmd
+++ b/workspace/01_NHD_prep.Rmd
@@ -16,6 +16,8 @@ knitr::opts_chunk$set(
 ```
 
 ```{r setup}
+library(dplyr)
+library(hyfabric)
 # Load custom functions
 source("R/utils.R")
 source("R/NHD_navigate.R")
@@ -33,7 +35,6 @@ if(!exists("vpu_codes")) {
 }
 
 all_rpu_codes <- unique(rpu_vpu$rpuid)
-full_cat_table <- readRDS(data_paths$fullcats_table)
 
 out_vpu <- rep(list(list()), length(vpu_codes))
 names(out_vpu) <- vpu_codes
diff --git a/workspace/R/2_prep.R b/workspace/R/2_prep.R
index 78243f1..ab879b2 100644
--- a/workspace/R/2_prep.R
+++ b/workspace/R/2_prep.R
@@ -387,4 +387,57 @@ finalize_gages <- function(SWIM, NHDPlusV2_gages, gfv11_gages, gages_ii_ref,
               col.names = FALSE, row.names = F)
   
   return(list(final = gage_final_ranked))
-}
\ No newline at end of file
+}
+
+#' prepare vpu base layers
+#' @param reg_gpkg character path to geopackage containing reference fabric for this vpu
+#' @param nav_gpkg character path to geopackage where vpu base layers will be written
+#' @param vpu vpu that will be written into output list
+#' @param full_cat_table table of all catchments for each vpu
+#' @return a list with the vpu and three tables for flowlines, waterbodies, and catchments
+prepare_vpu_base_layers <- function(ref_gpkg, nav_gpkg, vpu, full_cat_table) {
+
+  fline <- read_sf(ref_gpkg, "flowlines") 
+  catchment <- read_sf(ref_gpkg, "catchments") 
+  waterbodies <- read_sf(ref_gpkg, "waterbodies")
+  
+  # we can remove truely degenerate COMIDs 
+  # for 0 upstream area and no catchment area
+  degen_comid <- fline[fline$TotDASqKM == 0 & 
+                         !fline$COMID %in% catchment$featureid, ]$COMID
+  
+  # need to make sure we don't disconnect anything.
+  keep_tocomid <- fline$toCOMID[!fline$COMID %in% degen_comid]
+  
+  if(length(degen_comid[degen_comid %in% keep_tocomid]) > 0) stop("this will break the network")
+  
+  fline <- fline[!fline$COMID %in% degen_comid, ]
+  
+  # there are some duplicates with enhd
+  nhd <- fline %>% 
+    group_by(COMID) %>%
+    filter(row_number() == 1) %>%
+    ungroup() %>%
+    make_standalone()
+  
+  # Filter and write dendritic/non-coastal subset to gpkg
+  # This will be iterated over to produce the final network after POIs identified
+  zero_order <- filter(nhd, TerminalFl == 1 & TotDASqKM < min_da_km_gages)
+  non_dend <- unique(unlist(lapply(zero_order$COMID, \(x, nhdDF) get_UM(nhdDF, x), nhdDF = st_drop_geometry(nhd))))
+  nhd <- nhd %>%
+    mutate(dend = ifelse(!COMID %in% non_dend, 1, 0),
+           poi = ifelse(!COMID %in% non_dend & TotDASqKM >= min_da_km_gages, 1, 0)) 
+  
+  cat_network <- st_drop_geometry(nhd)
+  names(cat_network) <- tolower(names(cat_network))
+  cat_network <- select(cat_network, comid, tocomid, lengthkm, areasqkm, totdasqkm, 
+                        hydroseq, levelpathi, terminalpa, dnlevelpat, dnhydroseq,
+                        reachcode, frommeas, tomeas, pathlength, arbolatesu,
+                        ftype, fcode, vpuid, rpuid, wbareacomi)
+  
+  write_sf(nhd, nav_gpkg, nhd_flowline)
+  write_sf(cat_network, nav_gpkg, nhd_network)
+  write_sf(waterbodies, nav_gpkg, nhd_waterbody)
+  
+  return(list(vpu = vpu, flowline = nhd, catchment = cat_network, waterbodies = waterbodies))
+}
diff --git a/workspace/R/NHD_navigate.R b/workspace/R/NHD_navigate.R
index 3aaacd5..9b61582 100644
--- a/workspace/R/NHD_navigate.R
+++ b/workspace/R/NHD_navigate.R
@@ -1,3 +1,4 @@
+# TODO: remove use of this in favor of \(x, nhdDF) get_UM(nhdDF, x) or \(x, nhdDF) get_UT(nhdDF, x)
 #' Network navigation for upstream/downstream from a COMID of interest
 #' @param inCOM  (list) list of input COMIDs 
 #' @param nhdDF (sf data.frame) (data frame) valid data frame of NHD flowlines
diff --git a/workspace/R/config.R b/workspace/R/config.R
index b523be6..c2dcfae 100644
--- a/workspace/R/config.R
+++ b/workspace/R/config.R
@@ -1,43 +1,50 @@
 
-rpu_vpu <- readr::read_csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"), 
-                           show_col_types = FALSE)
-rpu_vpu_out <- readr::read_csv("cache/rpu_vpu_out.csv", 
-                               col_types = c("c", "c", "i" , "i"), show_col_types = FALSE)
-
-if(nchar(Sys.getenv("HYDREG")) > 1) {
-  vpu_codes <- Sys.getenv("HYDREG")
-}
-
-if(!exists("rpu_codes")) {
-  if(exists("vpu_codes")) {
-    rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
-    rpu_code <- rpu_codes$rpuid[1]
-  } else {
-    vpu_codes <- get_rpu_dependent_vars()
-    rpu_code <- vpu_codes$r
-    vpu_codes <- vpu_codes$v
-    rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
-  }
-}
-
-if(!exists("vpu_codes")) {
-  vpu_codes <- get_rpu_dependent_vars(rpu_code)$v
-}
-
-if(!nchar(out_agg_gpkg <- Sys.getenv("OUT_GPKG")) > 5) 
-  out_agg_gpkg <- file.path("cache", paste0(rpu_code, "_aggregate.gpkg"))
-
-if(!nchar(out_refac_gpkg <- Sys.getenv("OUT_REFACTOR_GPKG")) > 5)
-  out_refac_gpkg <- file.path("cache", paste0(rpu_code, "_refactor.gpkg"))
-
-if(!nchar(cache_split <- Sys.getenv("CACHE_SPLIT")) > 5) 
-  cache_split <- file.path("temp", paste0(rpu_code, ".rda"))
-
-if(!nchar(lookup_table_file <- Sys.getenv("LOOKUP_TABLE")) > 5) 
-  lookup_table_file <- file.path("cache", paste0(rpu_code, "_lookup.csv"))
-
-if(!nchar(vpu_lookup_table_file <- Sys.getenv("VPU_LOOKUP_TABLE")) > 5) 
-  vpu_lookup_table_file <- file.path("cache", paste0("lookup_", vpu_codes, ".csv"))
+# rpu_vpu <- readr::read_csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"), 
+#                            show_col_types = FALSE)
+# rpu_vpu_out <- readr::read_csv("cache/rpu_vpu_out.csv", 
+#                                col_types = c("c", "c", "i" , "i"), show_col_types = FALSE)
+
+# # The below deals with various ways to get / set rpu and vpu codes
+# # based on the rpu_vpu table.
+# # if a HYDREG environment variable is set, use it
+# if(nchar(Sys.getenv("HYDREG")) > 1) {
+#   vpu_codes <- Sys.getenv("HYDREG")
+# }
+# 
+# # if rpu_codes hasn't been set globally, try to set it.
+# if(!exists("rpu_codes")) {
+#   # if vpu_codes has been set we can get rpu based on that.
+#   if(exists("vpu_codes")) {
+#     rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
+#     rpu_code <- rpu_codes$rpuid[1]
+#   } else {
+#     # then get the default
+#     vpu_codes <- get_rpu_dependent_vars()
+#     rpu_code <- vpu_codes$r
+#     vpu_codes <- vpu_codes$v
+#     rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
+#   }
+# }
+# 
+# # if vpu_codes still doesn't exist, set it based on the rpu_code.
+# if(!exists("vpu_codes")) {
+#   vpu_codes <- get_rpu_dependent_vars(rpu_code)$v
+# }
+# 
+# if(!nchar(out_agg_gpkg <- Sys.getenv("OUT_GPKG")) > 5) 
+#   out_agg_gpkg <- file.path("cache", paste0(rpu_code, "_aggregate.gpkg"))
+# 
+# if(!nchar(out_refac_gpkg <- Sys.getenv("OUT_REFACTOR_GPKG")) > 5)
+#   out_refac_gpkg <- file.path("cache", paste0(rpu_code, "_refactor.gpkg"))
+# 
+# if(!nchar(cache_split <- Sys.getenv("CACHE_SPLIT")) > 5) 
+#   cache_split <- file.path("temp", paste0(rpu_code, ".rda"))
+# 
+# if(!nchar(lookup_table_file <- Sys.getenv("LOOKUP_TABLE")) > 5) 
+#   lookup_table_file <- file.path("cache", paste0(rpu_code, "_lookup.csv"))
+# 
+# if(!nchar(vpu_lookup_table_file <- Sys.getenv("VPU_LOOKUP_TABLE")) > 5) 
+#   vpu_lookup_table_file <- file.path("cache", paste0("lookup_", vpu_codes, ".csv"))
 
 options(scipen = 9999)
 
@@ -55,13 +62,13 @@ s_per_hr <- 3600 # seconds per hour
 
 # Intermediate layers created during 02_Navigate
 xwalk_layer <- paste0("HUC12_nhd") # HUC12 - nhdcat crosswalk, built in Nav for VPU 20
-nav_poi_layer <- paste0("POIs_tmp_", vpu_codes) # Rolling Nav POI layer added to/modified througout nav workflow
-WBs_layer <-  paste0("WB_", vpu_codes) # Waterbodies within VPU
-WB_events_layer <- paste0("WB_events_", vpu_codes) # inlet and outlet events for NHDArea and HR waterbodies
-poi_moved_layer <- paste0("POIs_mv_", vpu_codes) # POIs moved from original COMID assignment
-nsegments_layer <- paste0("nsegment_", vpu_codes) # Minimally-sufficient network dissolved by POI_ID
-pois_all_layer <- paste0("POIs_", vpu_codes) # All POIs binded together
-poi_xwalk_layer <- paste0("poi_xwalk_layer_", vpu_codes) # POIs that changed COMIDS during the navigate part of the workflow
+# nav_poi_layer <- paste0("POIs_tmp_", vpu_codes) # Rolling Nav POI layer added to/modified througout nav workflow
+# WBs_layer <-  paste0("WB_", vpu_codes) # Waterbodies within VPU
+# WB_events_layer <- paste0("WB_events_", vpu_codes) # inlet and outlet events for NHDArea and HR waterbodies
+# poi_moved_layer <- paste0("POIs_mv_", vpu_codes) # POIs moved from original COMID assignment
+# nsegments_layer <- paste0("nsegment_", vpu_codes) # Minimally-sufficient network dissolved by POI_ID
+# pois_all_layer <- paste0("POIs_", vpu_codes) # All POIs binded together
+# poi_xwalk_layer <- paste0("poi_xwalk_layer_", vpu_codes) # POIs that changed COMIDS during the navigate part of the workflow
 final_poi_layer <- "POIs"
 dup_pois <- "dup_POIs"
 
@@ -84,15 +91,15 @@ mapped_outlets_layer <- "mapped_POIs"
 lookup_table_refactor <- "lookup_table"
 catchment_network_table <- "catchment_network"
 
-# output  geopackage file names
-ref_gpkg <- file.path(jsonlite::read_json("cache/data_paths.json")$ref_fab_path, 
-                      paste0(vpu_codes, "_reference_features.gpkg"))
-nav_gpkg <- file.path("cache", paste0("reference_", vpu_codes,".gpkg"))
-temp_gpkg <- file.path("cache", paste0("nav_", vpu_codes,".gpkg"))
-
-rfc_gpkg <- file.path("cache", paste0("refactor_", vpu_codes, ".gpkg"))
-
-gf_gpkg <- file.path("cache", paste0("GF_", vpu_codes, ".gpkg"))
+# # output  geopackage file names
+# ref_gpkg <- file.path(jsonlite::read_json("cache/data_paths.json")$ref_fab_path, 
+#                       paste0(vpu_codes, "_reference_features.gpkg"))
+# nav_gpkg <- file.path("cache", paste0("reference_", vpu_codes,".gpkg"))
+# temp_gpkg <- file.path("cache", paste0("nav_", vpu_codes,".gpkg"))
+# 
+# rfc_gpkg <- file.path("cache", paste0("refactor_", vpu_codes, ".gpkg"))
+# 
+# gf_gpkg <- file.path("cache", paste0("GF_", vpu_codes, ".gpkg"))
 gf_gpkg_conus <- "cache/reference_CONUS.gpkg"
 
 gage_info_gpkg <- "cache/Gages_info.gpkg"
@@ -100,7 +107,7 @@ gage_info_csv <- "cache/Gages_info.csv"
 gage_info_csvt <- "cache/Gages_info.csvt"
 
 # Defined during NonDend.Rmd
-ND_gpkg <- file.path("temp", paste0("ND_", vpu_codes,".gpkg"))
+# ND_gpkg <- file.path("temp", paste0("ND_", vpu_codes,".gpkg"))
 
 divides_xwalk <- "divides_nhd"
 HRU_layer <- "all_cats"
diff --git a/workspace/_targets.R b/workspace/_targets.R
index fde27df..8f9ff98 100644
--- a/workspace/_targets.R
+++ b/workspace/_targets.R
@@ -1,8 +1,13 @@
 library(targets)
 library(tarchetypes)
 
-tar_option_set(packages = c("dplyr", "sf", "foreign", "hydroloom"),
-               memory = "transient", garbage_collection = TRUE)
+tar_option_set(packages = c("dplyr", "sf", "foreign", "hydroloom", "nhdplusTools"),
+               memory = "transient", garbage_collection = TRUE, error = "null", 
+               storage = "worker", retrieval = "worker", deployment = "main")
+
+library(future)
+library(future.callr)
+plan(multisession)
 
 # functions for this workflow
 source("R/2_prep.R")
@@ -52,12 +57,29 @@ list(
                                          gage_info_gpkg, gage_info_csv, gage_info_csvt, 
                                          geo_crs)),
   ###
-  tar_render(map_report, "01_Gage_Selection.Rmd", 
-             output_file = "temp/01_Gage_Selection.html",
+  tar_render(gage_selection_map_report, "templates/01_Gage_Selection.Rmd", 
+             output_file = "../temp/01_Gage_Selection.html",
              params = list(SWIM = SWIM, 
                            NHDPlusV2_gages = NHDPlusV2_gages, 
                            gfv11_gages = gfv11_gages, 
                            final_gages = final_gages,
-                           min_da_km_gages = min_da_km_gages))
+                           min_da_km_gages = min_da_km_gages)),
+  
+  tar_target(rpu_vpu, read.csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"), 
+                               colClasses = "character")),
+  
+  tar_target(vpu_codes, unique(rpu_vpu$vpuid)),
+  tar_target(all_rpu_codes, unique(rpu_vpu$rpuid)),
+  tar_target(full_cat_file, data_paths$fullcats_table, format = "file"),
+  tar_target(full_cat_table, readRDS(full_cat_file)),
+  
+  tar_target(ref_gpkg, file.path(data_paths$ref_fab_path, 
+                                  paste0(vpu_codes, "_reference_features.gpkg")),
+             pattern = map(vpu_codes)),
+  tar_target(nav_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg")),
+             pattern = map(vpu_codes)),
   
+  # ~1GB of memory
+  tar_target(vpu_base, prepare_vpu_base_layers(ref_gpkg, nav_gpkg, vpu_codes, full_cat_table),
+             pattern = map(ref_gpkg, nav_gpkg, vpu_codes), deployment = "worker")
 )
\ No newline at end of file
diff --git a/workspace/workflow_runner.R b/workspace/workflow_runner.R
index d19d30c..40ced23 100644
--- a/workspace/workflow_runner.R
+++ b/workspace/workflow_runner.R
@@ -9,4 +9,37 @@ file.remove("temp/GagestoCheck_GAGESIII.csv")
 # checks from nhdplusv2 gageloc prep
 file.remove("temp/GagestoCheck_GageLoc.csv")
 
-tar_make()
\ No newline at end of file
+# step 1: gage selection 
+tar_make(gage_selection_map_report)
+
+# step 2: vpu prep
+# iterates over vpus
+tar_load(vpu_codes)
+
+if(FALSE) { # this won't run if you just bang through this file
+  # To debug a given vpu, first run all to get the dynamic branches established:
+  tar_make(vpu_base)
+  
+  # if any error, you can rerun them as shown below.
+  
+  # Then find the branch in question and add it to tar_options_set in the workflow file:
+  
+  (branch <- tar_branch_names(vpu_base, which(vpu_codes == "01")))
+  vpu_codes[tar_branch_index(branch)]
+  
+  cat('debug = "', branch, '", cue = tar_cue(mode = "never")', sep = "")
+  
+  # now we run with callr_function = NULL to get the debug to hit on the desired branch.
+  # note that any other failed branches may try to run first -- it's best to debug the firt
+  # errored branch then work down the list.
+  
+  tar_make(callr_function = NULL)
+}
+
+# run branches for a given target in parallel if you have enough memory
+# note this will only work for targets with 'deployment = "worker"'
+tar_make_future(vpu_base, workers = 8)
+
+# to run all, just do:
+tar_make()
+
-- 
GitLab