Skip to content
Snippets Groups Projects
Commit d6e55009 authored by Blodgett, David L.'s avatar Blodgett, David L.
Browse files

first bit of NHD prep for #153 prepare step

parent 8b13f3ea
No related branches found
No related tags found
1 merge request!183Refactor - progress
......@@ -16,6 +16,8 @@ knitr::opts_chunk$set(
```
```{r setup}
library(dplyr)
library(hyfabric)
# Load custom functions
source("R/utils.R")
source("R/NHD_navigate.R")
......@@ -33,7 +35,6 @@ if(!exists("vpu_codes")) {
}
all_rpu_codes <- unique(rpu_vpu$rpuid)
full_cat_table <- readRDS(data_paths$fullcats_table)
out_vpu <- rep(list(list()), length(vpu_codes))
names(out_vpu) <- vpu_codes
......
......@@ -387,4 +387,57 @@ finalize_gages <- function(SWIM, NHDPlusV2_gages, gfv11_gages, gages_ii_ref,
col.names = FALSE, row.names = F)
return(list(final = gage_final_ranked))
}
\ No newline at end of file
}
#' prepare vpu base layers
#' @param reg_gpkg character path to geopackage containing reference fabric for this vpu
#' @param nav_gpkg character path to geopackage where vpu base layers will be written
#' @param vpu vpu that will be written into output list
#' @param full_cat_table table of all catchments for each vpu
#' @return a list with the vpu and three tables for flowlines, waterbodies, and catchments
prepare_vpu_base_layers <- function(ref_gpkg, nav_gpkg, vpu, full_cat_table) {
fline <- read_sf(ref_gpkg, "flowlines")
catchment <- read_sf(ref_gpkg, "catchments")
waterbodies <- read_sf(ref_gpkg, "waterbodies")
# we can remove truely degenerate COMIDs
# for 0 upstream area and no catchment area
degen_comid <- fline[fline$TotDASqKM == 0 &
!fline$COMID %in% catchment$featureid, ]$COMID
# need to make sure we don't disconnect anything.
keep_tocomid <- fline$toCOMID[!fline$COMID %in% degen_comid]
if(length(degen_comid[degen_comid %in% keep_tocomid]) > 0) stop("this will break the network")
fline <- fline[!fline$COMID %in% degen_comid, ]
# there are some duplicates with enhd
nhd <- fline %>%
group_by(COMID) %>%
filter(row_number() == 1) %>%
ungroup() %>%
make_standalone()
# Filter and write dendritic/non-coastal subset to gpkg
# This will be iterated over to produce the final network after POIs identified
zero_order <- filter(nhd, TerminalFl == 1 & TotDASqKM < min_da_km_gages)
non_dend <- unique(unlist(lapply(zero_order$COMID, \(x, nhdDF) get_UM(nhdDF, x), nhdDF = st_drop_geometry(nhd))))
nhd <- nhd %>%
mutate(dend = ifelse(!COMID %in% non_dend, 1, 0),
poi = ifelse(!COMID %in% non_dend & TotDASqKM >= min_da_km_gages, 1, 0))
cat_network <- st_drop_geometry(nhd)
names(cat_network) <- tolower(names(cat_network))
cat_network <- select(cat_network, comid, tocomid, lengthkm, areasqkm, totdasqkm,
hydroseq, levelpathi, terminalpa, dnlevelpat, dnhydroseq,
reachcode, frommeas, tomeas, pathlength, arbolatesu,
ftype, fcode, vpuid, rpuid, wbareacomi)
write_sf(nhd, nav_gpkg, nhd_flowline)
write_sf(cat_network, nav_gpkg, nhd_network)
write_sf(waterbodies, nav_gpkg, nhd_waterbody)
return(list(vpu = vpu, flowline = nhd, catchment = cat_network, waterbodies = waterbodies))
}
# TODO: remove use of this in favor of \(x, nhdDF) get_UM(nhdDF, x) or \(x, nhdDF) get_UT(nhdDF, x)
#' Network navigation for upstream/downstream from a COMID of interest
#' @param inCOM (list) list of input COMIDs
#' @param nhdDF (sf data.frame) (data frame) valid data frame of NHD flowlines
......
rpu_vpu <- readr::read_csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"),
show_col_types = FALSE)
rpu_vpu_out <- readr::read_csv("cache/rpu_vpu_out.csv",
col_types = c("c", "c", "i" , "i"), show_col_types = FALSE)
if(nchar(Sys.getenv("HYDREG")) > 1) {
vpu_codes <- Sys.getenv("HYDREG")
}
if(!exists("rpu_codes")) {
if(exists("vpu_codes")) {
rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
rpu_code <- rpu_codes$rpuid[1]
} else {
vpu_codes <- get_rpu_dependent_vars()
rpu_code <- vpu_codes$r
vpu_codes <- vpu_codes$v
rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
}
}
if(!exists("vpu_codes")) {
vpu_codes <- get_rpu_dependent_vars(rpu_code)$v
}
if(!nchar(out_agg_gpkg <- Sys.getenv("OUT_GPKG")) > 5)
out_agg_gpkg <- file.path("cache", paste0(rpu_code, "_aggregate.gpkg"))
if(!nchar(out_refac_gpkg <- Sys.getenv("OUT_REFACTOR_GPKG")) > 5)
out_refac_gpkg <- file.path("cache", paste0(rpu_code, "_refactor.gpkg"))
if(!nchar(cache_split <- Sys.getenv("CACHE_SPLIT")) > 5)
cache_split <- file.path("temp", paste0(rpu_code, ".rda"))
if(!nchar(lookup_table_file <- Sys.getenv("LOOKUP_TABLE")) > 5)
lookup_table_file <- file.path("cache", paste0(rpu_code, "_lookup.csv"))
if(!nchar(vpu_lookup_table_file <- Sys.getenv("VPU_LOOKUP_TABLE")) > 5)
vpu_lookup_table_file <- file.path("cache", paste0("lookup_", vpu_codes, ".csv"))
# rpu_vpu <- readr::read_csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"),
# show_col_types = FALSE)
# rpu_vpu_out <- readr::read_csv("cache/rpu_vpu_out.csv",
# col_types = c("c", "c", "i" , "i"), show_col_types = FALSE)
# # The below deals with various ways to get / set rpu and vpu codes
# # based on the rpu_vpu table.
# # if a HYDREG environment variable is set, use it
# if(nchar(Sys.getenv("HYDREG")) > 1) {
# vpu_codes <- Sys.getenv("HYDREG")
# }
#
# # if rpu_codes hasn't been set globally, try to set it.
# if(!exists("rpu_codes")) {
# # if vpu_codes has been set we can get rpu based on that.
# if(exists("vpu_codes")) {
# rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
# rpu_code <- rpu_codes$rpuid[1]
# } else {
# # then get the default
# vpu_codes <- get_rpu_dependent_vars()
# rpu_code <- vpu_codes$r
# vpu_codes <- vpu_codes$v
# rpu_codes <- rpu_vpu[rpu_vpu$vpuid %in% vpu_codes, ]
# }
# }
#
# # if vpu_codes still doesn't exist, set it based on the rpu_code.
# if(!exists("vpu_codes")) {
# vpu_codes <- get_rpu_dependent_vars(rpu_code)$v
# }
#
# if(!nchar(out_agg_gpkg <- Sys.getenv("OUT_GPKG")) > 5)
# out_agg_gpkg <- file.path("cache", paste0(rpu_code, "_aggregate.gpkg"))
#
# if(!nchar(out_refac_gpkg <- Sys.getenv("OUT_REFACTOR_GPKG")) > 5)
# out_refac_gpkg <- file.path("cache", paste0(rpu_code, "_refactor.gpkg"))
#
# if(!nchar(cache_split <- Sys.getenv("CACHE_SPLIT")) > 5)
# cache_split <- file.path("temp", paste0(rpu_code, ".rda"))
#
# if(!nchar(lookup_table_file <- Sys.getenv("LOOKUP_TABLE")) > 5)
# lookup_table_file <- file.path("cache", paste0(rpu_code, "_lookup.csv"))
#
# if(!nchar(vpu_lookup_table_file <- Sys.getenv("VPU_LOOKUP_TABLE")) > 5)
# vpu_lookup_table_file <- file.path("cache", paste0("lookup_", vpu_codes, ".csv"))
options(scipen = 9999)
......@@ -55,13 +62,13 @@ s_per_hr <- 3600 # seconds per hour
# Intermediate layers created during 02_Navigate
xwalk_layer <- paste0("HUC12_nhd") # HUC12 - nhdcat crosswalk, built in Nav for VPU 20
nav_poi_layer <- paste0("POIs_tmp_", vpu_codes) # Rolling Nav POI layer added to/modified througout nav workflow
WBs_layer <- paste0("WB_", vpu_codes) # Waterbodies within VPU
WB_events_layer <- paste0("WB_events_", vpu_codes) # inlet and outlet events for NHDArea and HR waterbodies
poi_moved_layer <- paste0("POIs_mv_", vpu_codes) # POIs moved from original COMID assignment
nsegments_layer <- paste0("nsegment_", vpu_codes) # Minimally-sufficient network dissolved by POI_ID
pois_all_layer <- paste0("POIs_", vpu_codes) # All POIs binded together
poi_xwalk_layer <- paste0("poi_xwalk_layer_", vpu_codes) # POIs that changed COMIDS during the navigate part of the workflow
# nav_poi_layer <- paste0("POIs_tmp_", vpu_codes) # Rolling Nav POI layer added to/modified througout nav workflow
# WBs_layer <- paste0("WB_", vpu_codes) # Waterbodies within VPU
# WB_events_layer <- paste0("WB_events_", vpu_codes) # inlet and outlet events for NHDArea and HR waterbodies
# poi_moved_layer <- paste0("POIs_mv_", vpu_codes) # POIs moved from original COMID assignment
# nsegments_layer <- paste0("nsegment_", vpu_codes) # Minimally-sufficient network dissolved by POI_ID
# pois_all_layer <- paste0("POIs_", vpu_codes) # All POIs binded together
# poi_xwalk_layer <- paste0("poi_xwalk_layer_", vpu_codes) # POIs that changed COMIDS during the navigate part of the workflow
final_poi_layer <- "POIs"
dup_pois <- "dup_POIs"
......@@ -84,15 +91,15 @@ mapped_outlets_layer <- "mapped_POIs"
lookup_table_refactor <- "lookup_table"
catchment_network_table <- "catchment_network"
# output geopackage file names
ref_gpkg <- file.path(jsonlite::read_json("cache/data_paths.json")$ref_fab_path,
paste0(vpu_codes, "_reference_features.gpkg"))
nav_gpkg <- file.path("cache", paste0("reference_", vpu_codes,".gpkg"))
temp_gpkg <- file.path("cache", paste0("nav_", vpu_codes,".gpkg"))
rfc_gpkg <- file.path("cache", paste0("refactor_", vpu_codes, ".gpkg"))
gf_gpkg <- file.path("cache", paste0("GF_", vpu_codes, ".gpkg"))
# # output geopackage file names
# ref_gpkg <- file.path(jsonlite::read_json("cache/data_paths.json")$ref_fab_path,
# paste0(vpu_codes, "_reference_features.gpkg"))
# nav_gpkg <- file.path("cache", paste0("reference_", vpu_codes,".gpkg"))
# temp_gpkg <- file.path("cache", paste0("nav_", vpu_codes,".gpkg"))
#
# rfc_gpkg <- file.path("cache", paste0("refactor_", vpu_codes, ".gpkg"))
#
# gf_gpkg <- file.path("cache", paste0("GF_", vpu_codes, ".gpkg"))
gf_gpkg_conus <- "cache/reference_CONUS.gpkg"
gage_info_gpkg <- "cache/Gages_info.gpkg"
......@@ -100,7 +107,7 @@ gage_info_csv <- "cache/Gages_info.csv"
gage_info_csvt <- "cache/Gages_info.csvt"
# Defined during NonDend.Rmd
ND_gpkg <- file.path("temp", paste0("ND_", vpu_codes,".gpkg"))
# ND_gpkg <- file.path("temp", paste0("ND_", vpu_codes,".gpkg"))
divides_xwalk <- "divides_nhd"
HRU_layer <- "all_cats"
......
library(targets)
library(tarchetypes)
tar_option_set(packages = c("dplyr", "sf", "foreign", "hydroloom"),
memory = "transient", garbage_collection = TRUE)
tar_option_set(packages = c("dplyr", "sf", "foreign", "hydroloom", "nhdplusTools"),
memory = "transient", garbage_collection = TRUE, error = "null",
storage = "worker", retrieval = "worker", deployment = "main")
library(future)
library(future.callr)
plan(multisession)
# functions for this workflow
source("R/2_prep.R")
......@@ -52,12 +57,29 @@ list(
gage_info_gpkg, gage_info_csv, gage_info_csvt,
geo_crs)),
###
tar_render(map_report, "01_Gage_Selection.Rmd",
output_file = "temp/01_Gage_Selection.html",
tar_render(gage_selection_map_report, "templates/01_Gage_Selection.Rmd",
output_file = "../temp/01_Gage_Selection.html",
params = list(SWIM = SWIM,
NHDPlusV2_gages = NHDPlusV2_gages,
gfv11_gages = gfv11_gages,
final_gages = final_gages,
min_da_km_gages = min_da_km_gages))
min_da_km_gages = min_da_km_gages)),
tar_target(rpu_vpu, read.csv(system.file("extdata/rpu_vpu.csv", package = "hyfabric"),
colClasses = "character")),
tar_target(vpu_codes, unique(rpu_vpu$vpuid)),
tar_target(all_rpu_codes, unique(rpu_vpu$rpuid)),
tar_target(full_cat_file, data_paths$fullcats_table, format = "file"),
tar_target(full_cat_table, readRDS(full_cat_file)),
tar_target(ref_gpkg, file.path(data_paths$ref_fab_path,
paste0(vpu_codes, "_reference_features.gpkg")),
pattern = map(vpu_codes)),
tar_target(nav_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg")),
pattern = map(vpu_codes)),
# ~1GB of memory
tar_target(vpu_base, prepare_vpu_base_layers(ref_gpkg, nav_gpkg, vpu_codes, full_cat_table),
pattern = map(ref_gpkg, nav_gpkg, vpu_codes), deployment = "worker")
)
\ No newline at end of file
......@@ -9,4 +9,37 @@ file.remove("temp/GagestoCheck_GAGESIII.csv")
# checks from nhdplusv2 gageloc prep
file.remove("temp/GagestoCheck_GageLoc.csv")
tar_make()
\ No newline at end of file
# step 1: gage selection
tar_make(gage_selection_map_report)
# step 2: vpu prep
# iterates over vpus
tar_load(vpu_codes)
if(FALSE) { # this won't run if you just bang through this file
# To debug a given vpu, first run all to get the dynamic branches established:
tar_make(vpu_base)
# if any error, you can rerun them as shown below.
# Then find the branch in question and add it to tar_options_set in the workflow file:
(branch <- tar_branch_names(vpu_base, which(vpu_codes == "01")))
vpu_codes[tar_branch_index(branch)]
cat('debug = "', branch, '", cue = tar_cue(mode = "never")', sep = "")
# now we run with callr_function = NULL to get the debug to hit on the desired branch.
# note that any other failed branches may try to run first -- it's best to debug the firt
# errored branch then work down the list.
tar_make(callr_function = NULL)
}
# run branches for a given target in parallel if you have enough memory
# note this will only work for targets with 'deployment = "worker"'
tar_make_future(vpu_base, workers = 8)
# to run all, just do:
tar_make()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment