diff --git a/workspace/R/02_POI_creation_functions.R b/workspace/R/02_POI_creation_functions.R index aa89549ac23c9dd7b847b5e647b88551196a97de..495edc6d7559f428cf6c1aec747cb85eb3033755 100644 --- a/workspace/R/02_POI_creation_functions.R +++ b/workspace/R/02_POI_creation_functions.R @@ -25,8 +25,8 @@ create_hu12_pois <- function(data, flowline, poi_name, vpu) { ungroup() # Create POIs - some r05 HUC12 POIs not in R05 NHD - list(tmp_POIs = POI_creation(st_drop_geometry(HUC12_COMIDs), filter(flowline, poi == 1), poi_name), - HUC12_COMIDs = HUC12_COMIDs) + list(POIs = POI_creation(st_drop_geometry(HUC12_COMIDs), filter(flowline, poi == 1), poi_name), + HUC12_COMIDs = HUC12_COMIDs, vpu = vpu) } @@ -34,16 +34,18 @@ create_hu12_pois <- function(data, flowline, poi_name, vpu) { #' @param gages as output from 01_prep finalize gages #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -#' @param vpu vpu that is being executed -#' @param tmp_POIs pois from previous target +#' @param POIs pois from previous target #' @param min_da_km_gages minimum drainage area for gages to be included #' @param combine_meters length to be used later in the workflow to combine flowlines #' @param reach_meas_thresh a threshold used to determine if a gage will split a flowline or not #' -create_gage_pois <- function(gages, flowline, poi_name, vpu, tmp_POIs, +create_gage_pois <- function(gages, flowline, poi_name, POIs, min_da_km_gages = 5, combine_meters, reach_meas_thresh) { + vpu <- POIs$vpu + POIs <- POIs$POIs + # Previously identified streamgages within Gage_Selection.Rmd streamgages_VPU <- gages %>% rename(COMID = comid) %>% @@ -59,14 +61,14 @@ create_gage_pois <- function(gages, flowline, poi_name, vpu, tmp_POIs, ungroup() # Derive GAGE POIs; use NHD as we've already filtered by NWIS DA in the Gage selection step - gages_POIs <- gage_POI_creation(tmp_POIs, streamgages, filter(flowline, poi == 1), + gages_POIs <- gage_POI_creation(POIs, streamgages, filter(flowline, poi == 1), combine_meters, reach_meas_thresh, poi_name) - tmp_POIs <- gages_POIs$tmp_POIs + POIs <- gages_POIs$POIs events <- rename(gages_POIs$events, POI_identifier = Type_Gages) - unassigned_gages <- filter(streamgages_VPU, !site_no %in% tmp_POIs$Type_Gages) %>% + unassigned_gages <- filter(streamgages_VPU, !site_no %in% POIs$Type_Gages) %>% rename(nhd_reachcode = REACHCODE) if(nrow(unassigned_gages)> 0) { @@ -75,12 +77,12 @@ create_gage_pois <- function(gages, flowline, poi_name, vpu, tmp_POIs, "Gages_info", paste0("VPU ", vpu, "; low gage score")) } - list(tmp_POIs = tmp_POIs, events = events, unassigned_gages = unassigned_gages) + list(POIs = POIs, events = events, unassigned_gages = unassigned_gages, vpu = vpu) } #' Creates POIs for gages using refactor criteria -#' @param temp_POIs (sf data.frame) current data frame of POIs +#' @param POIs (sf data.frame) current data frame of POIs #' @param gages_info (sf data.frame) VPU specific streamgages from 01_gage_selection #' @param nhd (sf data.frame) flowline data.frame #' @param combine_meters (integer) refactor threshold (m) for if if two adjacent fl should be combined @@ -88,10 +90,11 @@ create_gage_pois <- function(gages, flowline, poi_name, vpu, tmp_POIs, #' #' @return (sf data.frame) dataframe of gage POIs #' -gage_POI_creation <- function(tmp_POIs, gages_info, nhd, combine_meters, reach_meas_thresh, poi_name){ +gage_POI_creation <- function(POIs, gages_info, nhd, combine_meters, reach_meas_thresh, poi_name){ + # Create streamgage POIs gage_POIs <- POI_creation(select(st_drop_geometry(gages_info), COMID, site_no), nhd, poi_name) %>% - st_compatibalize(., tmp_POIs) + st_compatibalize(., POIs) # Avoid these for refactoring avoid <- dplyr::filter(nhd, (sqrt(AreaSqKM) / LENGTHKM) > 3 & AreaSqKM > 1) @@ -109,38 +112,40 @@ gage_POI_creation <- function(tmp_POIs, gages_info, nhd, combine_meters, reach_m select(COMID, REACHCODE = reachcode, REACH_meas = reach_meas, Type_Gages = site_no) %>% filter(!COMID %in% avoid$COMID) %>% mutate(event_type = "streamgage") %>% - st_compatibalize(., tmp_POIs) %>% + st_compatibalize(., POIs) %>% mutate(nexus = TRUE) # Reset gage flag if even is created gage_POIs_nonevent <- filter(gage_POIs, !Type_Gages %in% events$Type_Gages) %>% - addType(., tmp_POIs, poi_name, nexus = FALSE, bind = TRUE) + addType(., POIs, poi_name, nexus = FALSE, bind = TRUE) if(nrow(events) > 0){ - tmp_POIs <- data.table::rbindlist(list(gage_POIs_nonevent, + POIs <- data.table::rbindlist(list(gage_POIs_nonevent, select(events, COMID, Type_Gages, nexus)), fill = TRUE) %>% mutate(nexus = ifelse(is.na(nexus), FALSE, nexus)) %>% st_as_sf() } else { events <- NA - tmp_POIs <- mutate(tmp_POIs, nexus = FALSE) + POIs <- mutate(POIs, nexus = FALSE) } - return(list(events = events, tmp_POIs = tmp_POIs)) + return(list(events = events, POIs = POIs)) } #' create thermo electric POIs #' @param te_points point locations of thermo electric plants #' @param te_attributes attributes for te_points -#' @param tmp_POIs pois from previous target +#' @param POIs pois from previous target #' @param HUC12_COMIDs HUC12_COMIDs from hu12 poi step #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -#' @param vpu vpu that is being executed #' -create_thermo_electric_pois <- function(te_points, te_attributes, tmp_POIs, HUC12_COMIDs, - flowline, poi_name, vpu) { +create_thermo_electric_pois <- function(te_points, te_attributes, POIs, HUC12_COMIDs, + flowline, poi_name) { + + vpu <- POIs$vpu + POIs <- POIs$POIs if(vpu == "08"){ flowline$VPUID <- "08" @@ -173,16 +178,16 @@ create_thermo_electric_pois <- function(te_points, te_attributes, tmp_POIs, HUC1 summarize(eia_id = paste0(unique(Plant.Code), collapse = " "), count = n()) %>% ungroup() - unassigned_plants <- filter(TE_COMIDs, !COMID %in% tmp_POIs$COMID) + unassigned_plants <- filter(TE_COMIDs, !COMID %in% POIs$COMID) if(nrow(TE_COMIDs) > 0){ # Derive TE POIs - tmp_POIs <- POI_creation(st_drop_geometry(TE_COMIDs), filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, "TE", nexus = TRUE) + POIs <- POI_creation(st_drop_geometry(TE_COMIDs), filter(flowline, poi == 1), poi_name) %>% + addType(., POIs, "TE", nexus = TRUE) } - list(tmp_POIs = tmp_POIs, unassigned_plants = unassigned_plants) + list(POIs = POIs, unassigned_plants = unassigned_plants, vpu = vpu) } #' make waterbodies layer @@ -218,12 +223,15 @@ make_waterbodies_layer <- function(combined_waterbodies, flowline, geo_crs = 432 } #' create resops POIs -#' @param tmp_POIs tmp_POIs as created by previous step +#' @param POIs POIs as created by previous step #' @param wbs waterbodies table #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set #' -create_resops_pois <- function(tmp_POIs, wbs, istarf_xwalk, flowline, poi_name = "resops") { +create_resops_pois <- function(POIs, wbs, istarf_xwalk, flowline, poi_name = "resops") { + + vpu <- POIs$vpu + POIs <- POIs$POIs # Unnest wb_poi_lst wb_table <- st_drop_geometry(wbs$WBs_layer_orig) %>% @@ -287,7 +295,7 @@ create_resops_pois <- function(tmp_POIs, wbs, istarf_xwalk, flowline, poi_name = select(COMID = flowlcomid, resops = grand_id, Type_WBOut) # Existing reservoir-waterbody outlet POIs - exist_POIs_WB <- filter(resops_wb_df, flowlcomid %in% tmp_POIs$COMID) %>% + exist_POIs_WB <- filter(resops_wb_df, flowlcomid %in% POIs$COMID) %>% mutate(wb_id = ifelse(source == "NHDAREA", wbareacomi, wb_id)) %>% filter(!is.na(wb_id)) %>% select(COMID = flowlcomid, resops = grand_id, Type_WBOut = wb_id) @@ -298,12 +306,12 @@ create_resops_pois <- function(tmp_POIs, wbs, istarf_xwalk, flowline, poi_name = # Resops defined by reach if(nrow(resops_pois) > 0){ # Resops POIs with no reservoirs, defined by reach - tmp_POIs <- POI_creation(resops_pois, filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + POIs <- POI_creation(resops_pois, filter(flowline, poi == 1), poi_name) %>% + addType(., POIs, poi_name, nexus = TRUE) # TODO: investigate many to many join # Add waterbody attribute - tmp_POIs <- tmp_POIs %>% + POIs <- POIs %>% left_join(select(resops_pois, COMID, Type_WBOut), by = "COMID") %>% mutate(Type_WBOut = ifelse(!nexus, @@ -311,19 +319,23 @@ create_resops_pois <- function(tmp_POIs, wbs, istarf_xwalk, flowline, poi_name = # Resops with reservoirs resops_wb_df <- resops_wb_df %>% - mutate(accounted = ifelse(grand_id %in% tmp_POIs$Type_resops, 1, 0), + mutate(accounted = ifelse(grand_id %in% POIs$Type_resops, 1, 0), source = ifelse(grand_id %in% reach_resops$resops, "REACH", source)) } - list(wb_pois = wb_poi_lst, wb_resops = resops_wb_df, tmp_POIs = tmp_POIs) + list(wb_pois = wb_poi_lst, wb_resops = resops_wb_df, POIs = POIs, vpu = vpu) } #' create hilarri pois -#' @param tmp_POIs tmp_POIs as created by previous step +#' @param POIs POIs as created by previous step #' @param wbs waterbodies table #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -create_hilarri_pois <- function(tmp_POIs, hilarri_data, wb_poi_lst, flowline, poi_name = "hilarri") { +create_hilarri_pois <- function(POIs, hilarri_data, wb_poi_lst, flowline, poi_name = "hilarri") { + + vpu <- POIs$vpu + POIs <- POIs$POIs + old_sf_use_s2 <- sf::sf_use_s2(FALSE) on.exit(sf::sf_use_s2(old_sf_use_s2)) # 1: Many based on original GNIS_ID @@ -374,39 +386,39 @@ create_hilarri_pois <- function(tmp_POIs, hilarri_data, wb_poi_lst, flowline, po st_drop_geometry() # Make POIs for reach POIs - tmp_POIs <- POI_creation(reach_pois, filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + POIs <- POI_creation(reach_pois, filter(flowline, poi == 1), poi_name) %>% + addType(., POIs, poi_name, nexus = TRUE) - list(tmp_POIs = tmp_POIs, wb_pois = wb_poi_lst) + list(POIs = POIs, wb_pois = wb_poi_lst, vpu = vpu) } #' create waterbody outlet pois -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param NHDPlus_NHDArea NHDArea features from NHDPlusV2 #' @param wb_poi_lst wb_pois from hilarri output list #' @param resops_wb_df wb_resops from resops output list #' @param flowline flowlines to attach pois to #' @param events events from gage output list -#' @param nav_gpkg name of gpkg for current vpu -#' @param data_paths list of data sources #' @param proj_crs projected crs for overall project #' @param gages gages from gage_info_gpkg #' @param poi_name name for poi set -create_wb_outlet_pois <- function(tmp_POIs, +create_wb_outlet_pois <- function(POIs, NHDPlus_NHDArea, wb_poi_lst, resops_wb_df, flowline, events, - nav_gpkg, - data_paths, proj_crs, gages, - poi_name) { + poi_name, + nhdplushr_dir) { + + vpu <- POIs$vpu + POIs <- POIs$POIs # Hillari IDs - hilarri_pois <- filter(tmp_POIs, !is.na(Type_hilarri)) + hilarri_pois <- filter(POIs, !is.na(Type_hilarri)) # Get NHDARea polygons already define nhd_area_vpu <- NHDPlus_NHDArea %>% @@ -454,7 +466,7 @@ create_wb_outlet_pois <- function(tmp_POIs, # fix something here wb_layers <- wbout_POI_creaton(filter(flowline, poi == 1), final_wb_list, - data_paths, tmp_POIs, proj_crs, wb_poi_lst, poi_name) + NHDPlus_NHDArea, POIs, proj_crs, wb_poi_lst, poi_name, nhdplushr_dir) wb_events <- poi_missing_wb <- wb_missing_poi <- double_wb <- double_wb_pois <- NULL @@ -487,40 +499,42 @@ create_wb_outlet_pois <- function(tmp_POIs, events <- rbind(events, wb_events) } - tmp_POIs <- wb_layers$POIs + POIs <- wb_layers$POIs # Write out waterbodies WB_VPU_pois <- wb_layers$WBs } else { - tmp_POIs <- wb_layers + POIs <- wb_layers WB_VPU_pois <- final_wb_list } #12047928 - wb_out_col <- wb_poi_collapse(tmp_POIs, flowline, events, gages) - tmp_POIs <- wb_out_col$POIs + wb_out_col <- wb_poi_collapse(POIs, flowline, events, gages) + POIs <- wb_out_col$POIs if(all(!is.na(wb_out_col$events_ret))){ events <- wb_out_col$events_ret } - list(tmp_POIs = tmp_POIs, wb_out_col = wb_out_col, + list(POIs = POIs, wb_out_col = wb_out_col, all_events = events, wb_events = wb_events, double_wb = double_wb, double_wb_pois = double_wb_pois, wb_missing_poi = wb_missing_poi, poi_missing_wb = poi_missing_wb, - updated_flowline = wb_layers$nhd_wb_table, WB_VPU_pois = WB_VPU_pois) + updated_flowline = wb_layers$nhd_wb_table, WB_VPU_pois = WB_VPU_pois, vpu = vpu) } #' Creates Waterbody POIs, calls a few other functions #' @param nhd (sf data.frame) flowline data.frame #' @param wb_table (sf data.frame) -#' @param data_paths (list) data paths to data layers +#' @param nhdplus_area (sf data.frame) NHDPlus Area features #' @param crs (integer) CRS to use (epsg code) #' @param wb_poi_lst TODO #' @param poi_name name for pois to be created +#' @param nhdplushr_dir directory containing NHDPlusHR data #' #' @return (sf data.frame) dataframe of waterbody outlet POIs #' -wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_lst, poi_name){ +wbout_POI_creaton <- function(nhd, wb_table, nhdplus_area, POIs, crs, + wb_poi_lst, poi_name, nhdplushr_dir){ wb_components_table <- st_drop_geometry(wb_table) %>% dplyr::mutate(member_comid = strsplit(member_comid, ",")) %>% @@ -559,7 +573,7 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l # Remaining WB that don't have outlet COMID defined and aren't in ResOPsUS WB_outlet <- filter(wb_table, is.na(resops_flowlcomid), - !wb_id %in% tmp_POIs$Type_WBOut) + !wb_id %in% POIs$Type_WBOut) # Size-based waterbody outlet POIs ot in NHDArea, HR, or # manually defined with ResOPS @@ -583,13 +597,13 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l wbout_coms <- distinct(rbind(wbout_COMIDs, resops_main_outlets)) # Get NHDArea and HI-Res waterbodies for ResOpsUS locaitons - WBs_VPU_areaHR <- HR_Area_coms(nhd, resops_wb_other_sources, data_paths, crs) + WBs_VPU_areaHR <- HR_Area_coms(nhd, resops_wb_other_sources, nhdplus_area, crs, nhdplushr_dir) # if there are no HR/Area waterbodies generated if(all(is.na(WBs_VPU_areaHR))){ - tmp_POIs <- POI_creation(wbout_coms, + POIs <- POI_creation(wbout_coms, nhd, "wb_id") %>% - addType(., tmp_POIs, "wb_id", nexus = TRUE) %>% + addType(., POIs, "wb_id", nexus = TRUE) %>% mutate(Type_WBOut = ifelse(is.na(Type_WBOut), Type_wb_id, Type_WBOut)) %>% select(-Type_wb_id) %>% left_join(select(st_drop_geometry(resops_main), grand_id, resops_flowlcomid), @@ -613,9 +627,9 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l wbout_coms <- rbind(wbout_coms, wbareaout_coms) } - tmp_POIs <- POI_creation(select(wbout_coms, COMID, wb_id), + POIs <- POI_creation(select(wbout_coms, COMID, wb_id), nhd, poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) %>% + addType(., POIs, poi_name, nexus = TRUE) %>% mutate(Type_WBOut = ifelse(is.na(Type_WBOut) & (!nexus | is.na(nexus)), Type_wb_id, Type_WBOut)) %>% select(-Type_wb_id) %>% @@ -655,11 +669,11 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l if(nrow(nhd_WBs) > 0){ # Create outlet events for splitting HR and NHDArea waterbodies wb_outlet_events <- WB_event(WBs_VPU_areaHR, nhd_WBs, "outlet") %>% - st_compatibalize(., tmp_POIs) %>% + st_compatibalize(., POIs) %>% mutate(nexus = TRUE, Type_resops = 2) - tmp_POIs <- data.table::rbindlist(list(tmp_POIs, - select(wb_outlet_events, COMID, Type_WBOut = wb_id, Type_resops, nexus)), fill = TRUE) %>% + POIs <- data.table::rbindlist(list(POIs, + select(wb_outlet_events, COMID, Type_WBOut = wb_id, Type_resops, nexus)), fill = TRUE) %>% st_as_sf() } else { wb_outlet_events <- NA @@ -682,11 +696,11 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l st_geometry(WBs_VPU) <- st_make_valid(st_geometry(WBs_VPU)) - return(list(POIs = tmp_POIs, events = wb_outlet_events, WBs = WBs_VPU, + return(list(POIs = POIs, events = wb_outlet_events, WBs = WBs_VPU, nhd_wb_table = nhd)) } else { - return(list(POIs = tmp_POIs, events = NA, WBs = wb_table, + return(list(POIs = POIs, events = NA, WBs = wb_table, nhd_wb_table = nhd)) } } @@ -695,16 +709,17 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l #' @param nhd (sf data.frame) flowline data.frame #' @param wb_lyr (sf data.frame) waterbodies to create inlet pois from #' @param events (sf data.frame) of events that waterbody inlets can be pulled from -#' @param data_paths (list) list of data paths +#' @param nhdplus_area (sf data.frame) NHDPlus Area features #' @param crs (integer) CRS to use (epsg code) -#' @param tmp_POIs (sf data.frame) rolling POI data frame +#' @param POIs (sf data.frame) rolling POI data frame +#' @param nhdplushr_dir directory containing NHDPlusHR data #' #' @return (sf data.frame) dataframe of WB inlet POIs #' -wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, - tmp_POIs, poi_name = "WBIn"){ +wbin_POIcreation <- function(flowline, wb_lyr, events, nhdplus_area, crs, + POIs, poi_name = "WBIn", nhdplushr_dir){ - wbout_COMIDs <- filter(tmp_POIs, !is.na(Type_WBOut)) + wbout_COMIDs <- filter(POIs, !is.na(Type_WBOut)) WBs_in_POIs <- filter(wb_lyr, wb_id %in% wbout_COMIDs$Type_WBOut) # Unnest to get list of waterbody COMIDS (member_comid) @@ -774,7 +789,7 @@ wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, # Get NHDArea and HR waterbodies WBs_VPU_areaHR <- HR_Area_coms(flowline, filter(wb_lyr, source == "HR ID AVAILABLE"), - data_paths, crs) + nhdplus_area, crs, nhdplushr_dir) if(is.list(WBs_VPU_areaHR)) { nhd_WBs <- filter(flowline, minNet == 1, @@ -825,7 +840,7 @@ wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, } } - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn", nexus = TRUE) + wbin_POIs <- addType(wbin_POIs, POIs, "WBIn", nexus = TRUE) if(nrow(wb_inlet_events) > 0){ @@ -840,11 +855,11 @@ wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, return(list(POIs = wbin_POIs, events = wb_inlet_events)) } else { print("no waterbody inlets events") - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") + wbin_POIs <- addType(wbin_POIs, POIs, "WBIn") return(list(POIs = wbin_POIs, events = NA)) } } else { - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") + wbin_POIs <- addType(wbin_POIs, POIs, "WBIn") return(list(POIs = wbin_POIs, events = NA)) } @@ -855,12 +870,14 @@ wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, #' locations if absent from NHD waterbody #' @param nhd (sf data.frame) Fowlines for given VPU #' @param WBs (sf data.frame) waterbodiess for discretization within VPU -#' @param data_paths (list) data paths to data layers +#' @param nhdplus_area (sf data.frame) NHDPlus Area features +#' @param crs coordinate reference system to use for analysis +#' @param nhdplushr_dir directory containing NHDPlusHR data #' #' @return (list) wbs - sf data frame for NHDArea and HR waterbodies #' wb_table - table of flowlines and outlet info for each #' feature in wb -HR_Area_coms <- function(nhd, WBs, data_paths, crs){ +HR_Area_coms <- function(nhd, WBs, nhdplus_area, crs, nhdplushr_dir){ # Pull out rows for VPU that are NHDArea nhd_area_resops <- WBs %>% @@ -883,7 +900,7 @@ HR_Area_coms <- function(nhd, WBs, data_paths, crs){ # If NHDArea feature needed retrieve from National GDB if (nrow(nhd_area_resops) > 0){ - nhd_area_vpu <- read_sf(data_paths$nhdplus_gdb, "NHDArea") %>% + nhd_area_vpu <- nhdplus_area %>% filter(COMID %in% nhd_area_resops$member_comid) %>% mutate(source = "NHDv2Area") @@ -900,14 +917,14 @@ HR_Area_coms <- function(nhd, WBs, data_paths, crs){ hr_wb <- do.call("rbind", lapply(unique(huc04), function(x){ print(x) vpu <- unique(substr(x, 1, 2)) - nhd_file <- list.files(data_paths$nhdplushr_dir, pattern = paste0(".*",x,".*\\.gdb$"), + nhd_file <- list.files(nhdplushr_dir, pattern = paste0(".*",x,".*\\.gdb$"), recursive = TRUE, full.names = TRUE,include.dirs = TRUE) if(length(nhd_file) < 1) { - download_nhdplushr(data_paths$nhdplus_dir, unique(huc04)) + download_nhdplushr(nhdplushr_dir, unique(huc04)) } - nhd_file <- list.files(data_paths$nhdplushr_dir, pattern = paste0(".*",x,".*\\.gdb$"), + nhd_file <- list.files(nhdplushr_dir, pattern = paste0(".*",x,".*\\.gdb$"), recursive = TRUE, full.names = TRUE,include.dirs = TRUE) if(length(nhd_file) != 1) { @@ -1150,13 +1167,13 @@ WB_event <- function(WBs, nhd_wb, type){ ### used as part of waterbody poi vreation in the workflow #' Collapses POIs us/ds of waterbody POIs -#' @param (sf data.frame) rolling POI data frame +#' @param POIs (sf data.frame) rolling POI data frame #' @param nhd (sf data.frame) nhd flowlines #' @param events (sf data.frame) waterbody events #' @param gages gages from gage selection #' #' @return (sf data.frame) dataframe of wb inlet POIs collapsed -wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ +wb_poi_collapse <- function(POIs, nhd, events, gages){ gage_dist_node <- function(x, wb_ds_ds, gage_add, events){ print (x) wb_out_fl <- distinct(filter(wb_ds_ds, COMID == x)) @@ -1221,7 +1238,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ switchDiv(., nhd) # get waterbody outlets - wb_out <- filter(tmp_POIs, !is.na(Type_WBOut), is.na(Type_Gages)) + wb_out <- filter(POIs, !is.na(Type_WBOut), is.na(Type_Gages)) wb_out_nexus <- filter(wb_out, nexus) wb_out_nhd <- filter(nhd, COMID %in% wb_out$COMID) @@ -1229,7 +1246,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ rbind(filter(nhd, LevelPathI %in% .$LevelPathI & Hydroseq %in% .$DnHydroseq)) # get gages - gage_wb <- filter(tmp_POIs, !is.na(Type_Gages) & is.na(Type_WBOut)) #%>% + gage_wb <- filter(POIs, !is.na(Type_Gages) & is.na(Type_WBOut)) #%>% # filter(COMID %in% wb_ds_ds$COMID) #gage_nhd <- filter(nhd, COMID %in% gage_wb$COMID) #gage_node <- filter(gage_wb, nexus == FALSE) @@ -1247,7 +1264,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ output_length <- output[lengths(output) > 1] if(length(output_length) == 0){ - return(list(POIs = tmp_POIs, events_ret = NA)) + return(list(POIs = POIs, events_ret = NA)) } output_full <- do.call("rbind", output[lengths(output) > 1]) %>% @@ -1260,7 +1277,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ filter(gage_dist < 1, DAR > .975) %>% st_drop_geometry() - gage_POI <- filter(tmp_POIs, COMID %in% output_full$gage_comid) %>% + gage_POI <- filter(POIs, COMID %in% output_full$gage_comid) %>% select(COMID, Type_HUC12_ds = Type_HUC12, Type_Gages_ds = Type_Gages, Type_TE_ds = Type_TE, nexus) %>% st_drop_geometry() %>% @@ -1270,7 +1287,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ Type_TE_ds = last(na.omit(Type_TE_ds)), nexus = last(na.omit(nexus))) - WB_POI <- filter(tmp_POIs, COMID %in% output_full$wbout_comid, !is.na(Type_WBOut)) %>% + WB_POI <- filter(POIs, COMID %in% output_full$wbout_comid, !is.na(Type_WBOut)) %>% inner_join(select(output_full, gage_comid, wbout_comid), by = c("COMID" = "wbout_comid")) %>% inner_join(select(gage_POI, -nexus), by = c("gage_comid" = "COMID")) %>% mutate(Type_HUC12 = ifelse(!is.na(Type_HUC12_ds), Type_HUC12_ds, Type_HUC12), @@ -1278,7 +1295,7 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ Type_TE = ifelse(!is.na(Type_TE_ds), Type_TE_ds, Type_TE)) %>% select(-c(Type_HUC12_ds, Type_Gages_ds, Type_TE_ds)) - tmp_POIs_fin <- filter(tmp_POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% + POIs_fin <- filter(POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% rbind(select(WB_POI, -gage_comid)) if(any(gage_POI$nexus == TRUE)){ @@ -1287,14 +1304,17 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ } - return(list(POIs = tmp_POIs_fin, events_ret = events)) + return(list(POIs = POIs_fin, events_ret = events)) } #' create abstraction(?) and removal event points of interest #' @param ar_data abstraction and removal data from USGS data release #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -create_ar_event_pois <- function(tmp_POIs, ar_data, flowline, poi_name = "AR") { +create_ar_event_pois <- function(POIs, ar_data, flowline, poi_name = "AR") { + + vpu <- POIs$vpu + POIs <- POIs$POIs ar_events <- ar_data %>% filter(fromComid > 0 | ToComid > 0) %>% @@ -1303,23 +1323,26 @@ create_ar_event_pois <- function(tmp_POIs, ar_data, flowline, poi_name = "AR") { if(nrow(ar_events) > 0){ # AR POIs - tmp_POIs <- POI_creation(ar_events, flowline, poi_name) %>% - addType(., tmp_POIs, "AR", nexus = TRUE) + POIs <- POI_creation(ar_events, flowline, poi_name) %>% + addType(., POIs, "AR", nexus = TRUE) } - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } #' create terminal pois -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to #' @param min_da_km_terminal minimum drainage area to include a network #' @param poi_name name for poi set -create_terminal_pois <- function(tmp_POIs, flowline, min_da_km_terminal, poi_name = "Term") { +create_terminal_pois <- function(POIs, flowline, min_da_km_terminal, poi_name = "Term") { + + vpu <- POIs$vpu + POIs <- POIs$POIs term_paths <- filter(st_drop_geometry(flowline), - Hydroseq %in% filter(flowline, COMID %in% tmp_POIs$COMID)$TerminalPa) + Hydroseq %in% filter(flowline, COMID %in% POIs$COMID)$TerminalPa) # Non-POI levelpath terminal pois, but meet size criteria terminal_POIs <- st_drop_geometry(flowline) %>% @@ -1329,19 +1352,23 @@ create_terminal_pois <- function(tmp_POIs, flowline, min_da_km_terminal, poi_nam # Use level path identifier as Type ID select(COMID, LevelPathI) - tmp_POIs <- POI_creation(terminal_POIs, flowline, poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + POIs <- POI_creation(terminal_POIs, flowline, poi_name) %>% + addType(., POIs, poi_name, nexus = TRUE) - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } #' create confluence pois -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -create_confluence_pois <- function(tmp_POIs, flowline, poi_name = "Conf") { +create_confluence_pois <- function(POIs, flowline, poi_name = "Conf") { + + vpu <- POIs$vpu + POIs <- POIs$POIs + # Navigate upstream from each POI and determine minimally-sufficient network between current POI sets - up_net <- unique(unlist(lapply(unique(tmp_POIs$COMID), + up_net <- unique(unlist(lapply(unique(POIs$COMID), \(x, nhdDF) get_UM(nhdDF, x), nhdDF = flowline))) @@ -1363,28 +1390,31 @@ create_confluence_pois <- function(tmp_POIs, flowline, poi_name = "Conf") { ungroup() %>% select(COMID, Type_Conf) - tmp_POIs <- POI_creation(conf_COMIDs, filter(flowline, minNet == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + POIs <- POI_creation(conf_COMIDs, filter(flowline, minNet == 1), poi_name) %>% + addType(., POIs, poi_name, nexus = TRUE) - list(tmp_POIs = tmp_POIs, updated_flowline = flowline) + list(POIs = POIs, updated_flowline = flowline, vpu = vpu) } #' create waterbody inlet pois -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param wb_lyr waterbodies to create inlet pois from #' @param events complete layer of events prior to creating waterbody inlets #' @param flowline flowlines to attach pois to -#' @param nav_gpkg name of gpkg for current vpu -#' @param data_paths list of data sources +#' @param nhdplus_area (sf data.frame) NHDPlus Area features #' @param proj_crs projected crs for overall project #' @param poi_name name for poi set -create_wb_inlet_pois <- function(tmp_POIs, wb_lyr, events, - flowline, gages, data_paths, - proj_crs, poi_name = "WBIn") { +#' @param nhdplushr_dir directory containing NHDPlusHR data +create_wb_inlet_pois <- function(POIs, wb_lyr, events, + flowline, gages, nhdplus_area, + proj_crs, poi_name = "WBIn", nhdplushr_dir) { + vpu <- POIs$vpu + POIs <- POIs$POIs + wb_layers <- wbin_POIcreation(filter(flowline, minNet == 1), wb_lyr, events, - data_paths, proj_crs, tmp_POIs, - poi_name) + nhdplus_area, proj_crs, POIs, + poi_name, nhdplushr_dir) wb_in_col <- wb_inlet_collapse(wb_layers$POIs, flowline, events, gages) @@ -1398,18 +1428,18 @@ create_wb_inlet_pois <- function(tmp_POIs, wb_lyr, events, events <- rbind(events, st_compatibalize(wb_inlet_events, events)) } - list(tmp_POIs = wb_in_col$POIs, events = events) + list(POIs = wb_in_col$POIs, events = events, vpu = vpu) } ### used as part of waterbody poi creation in the workflow #' collapses close upstream gages with wb inlet events. -#' @param temp_POIs (sf data.frame) rolling POI data frame +#' @param POIs (sf data.frame) rolling POI data frame #' @param nhd (sf data.frame) nhd flowlines #' @param events (list) waterbody inlet events #' #' @return (sf data.frame) dataframe of gage POIs #' -wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ +wb_inlet_collapse <- function(POIs, nhd, events, gages){ gage_dist_node <- function(x, wb_ds_ds, gage_add, events){ # print (x) wb_in_fl <- filter(wb_ds_ds, COMID == x) @@ -1478,7 +1508,7 @@ wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ switchDiv(., nhd) # get waterbody outlets - wb_in <- filter(tmp_POIs, !is.na(Type_WBIn), is.na(Type_Gages)) + wb_in <- filter(POIs, !is.na(Type_WBIn), is.na(Type_Gages)) wb_in_nexus <- filter(wb_in, nexus == TRUE) wb_in_nhd <- filter(nhd, COMID %in% wb_in$COMID) @@ -1487,7 +1517,7 @@ wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ distinct() # get gages - gage_wb <- filter(tmp_POIs, !is.na(Type_Gages) & is.na(Type_WBIn)) #%>% + gage_wb <- filter(POIs, !is.na(Type_Gages) & is.na(Type_WBIn)) #%>% # filter(COMID %in% wb_ds_ds$COMID) #gage_nhd <- filter(nhd, COMID %in% gage_wb$COMID) #gage_node <- filter(gage_wb, nexus == FALSE) @@ -1515,7 +1545,7 @@ wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ filter(gage_dist < 1, DAR > .975) %>% st_drop_geometry() - gage_POI <- filter(tmp_POIs, COMID %in% output_full$gage_comid) %>% + gage_POI <- filter(POIs, COMID %in% output_full$gage_comid) %>% select(COMID, Type_HUC12_ds = Type_HUC12, Type_Gages_ds = Type_Gages, Type_TE_ds = Type_TE, Type_Term_ds = Type_Term, nexus) %>% st_drop_geometry() %>% @@ -1526,7 +1556,7 @@ wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ Type_TE_ds = last(na.omit(Type_TE_ds)), nexus = last(na.omit(nexus))) - WB_POI <- filter(tmp_POIs, COMID %in% output_full$wbin_comid, !is.na(Type_WBIn)) %>% + WB_POI <- filter(POIs, COMID %in% output_full$wbin_comid, !is.na(Type_WBIn)) %>% left_join(select(output_full, gage_comid, wbin_comid), by = c("COMID" = "wbin_comid")) %>% left_join(select(gage_POI, -nexus), by = c("gage_comid" = "COMID")) %>% mutate(Type_HUC12 = ifelse(!is.na(Type_HUC12_ds), Type_HUC12_ds, Type_HUC12), @@ -1535,25 +1565,29 @@ wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ Type_Term = ifelse(!is.na(Type_Term_ds), Type_Term_ds, Type_Term)) %>% select(-c(Type_HUC12_ds, Type_Gages_ds, Type_TE_ds, Type_Term_ds)) - tmp_POIs_fin <- filter(tmp_POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% + POIs_fin <- filter(POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% rbind(select(WB_POI, -gage_comid)) if(any(gage_POI$nexus == TRUE)){ gage_events <- filter(gage_POI, nexus == TRUE) events <- filter(events, !POI_identifier %in% gage_events$Type_Gages_ds) } - return(list(POIs = tmp_POIs_fin, events_ret = events)) + return(list(POIs = POIs_fin, events_ret = events)) } else { print ("no points collapse") - return(list(POIs = tmp_POIs, events_ret = NA)) + return(list(POIs = POIs, events_ret = NA)) } } -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param nid_points national inventory of dams data #' @param flowline flowlines to attach pois to #' @param poi_name name for poi set -create_nid_pois <- function(tmp_POIs, nid_points, flowline, poi_name) { +create_nid_pois <- function(POIs, nid_points, flowline, poi_name) { + + vpu <- POIs$vpu + POIs <- POIs$POIs + # Read in NID shapefile NID_COMIDs <- nid_points %>% st_drop_geometry() %>% @@ -1564,25 +1598,28 @@ create_nid_pois <- function(tmp_POIs, nid_points, flowline, poi_name) { summarize(Type_NID = paste0(unique(NIDID), collapse = " ")) # Derive other NID POIs - tmp_POIs <- POI_creation(NID_COMIDs, flowline, poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = FALSE, bind = FALSE) + POIs <- POI_creation(NID_COMIDs, flowline, poi_name) %>% + addType(., POIs, poi_name, nexus = FALSE, bind = FALSE) - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to #' @param all_nhdplus_attributes original attributes from nhdplusv2 #' @param min_da_km_hw minimum drainage area for creation of a headwater poi #' @param poi_name name for poi set -create_headwater_pois <- function(tmp_POIs, flowline, +create_headwater_pois <- function(POIs, flowline, all_nhdplus_attributes, min_da_km_hw, poi_name = "DA") { + vpu <- POIs$vpu + POIs <- POIs$POIs + # derive incremental segments from POIs inc_segs <- make_incremental_segments(flowline, filter(st_drop_geometry(flowline), - COMID %in% tmp_POIs$COMID)) %>% + COMID %in% POIs$COMID)) %>% # bring over VAA data inner_join(select(all_nhdplus_attributes, COMID, DnHydroseq, VA_MA, TOTMA, LENGTHKM, MAXELEVSMO, @@ -1629,27 +1666,31 @@ create_headwater_pois <- function(tmp_POIs, flowline, mutate(da_class = paste(POI_ID, ind, sep = "_")) %>% ungroup() %>% select(-ind) %>% - filter(!COMID %in% tmp_POIs$COMID) + filter(!COMID %in% POIs$COMID) if(nrow(hw_DA_splits) > 0) { - tmp_POIs <- POI_creation(select(hw_DA_splits, COMID, da_class), + POIs <- POI_creation(select(hw_DA_splits, COMID, da_class), filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + addType(., POIs, poi_name, nexus = TRUE) } - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to #' @param all_nhdplus_attributes original attributes from nhdplusv2 #' @param poi_name name for poi set -create_elevation_break_pois <- function(tmp_POIs, flowline, all_nhdplus_attributes, +create_elevation_break_pois <- function(POIs, flowline, all_nhdplus_attributes, poi_name = "elev") { + + vpu <- POIs$vpu + POIs <- POIs$POIs + inc_segs <- make_incremental_segments(filter(flowline, minNet == 1), filter(st_drop_geometry(flowline), - COMID %in% tmp_POIs$COMID)) %>% + COMID %in% POIs$COMID)) %>% # bring over VAA data inner_join(select(all_nhdplus_attributes, COMID, DnHydroseq, VA_MA, TOTMA, LENGTHKM, MAXELEVSMO, @@ -1682,17 +1723,17 @@ create_elevation_break_pois <- function(tmp_POIs, flowline, all_nhdplus_attribut mutate(elev_class = paste(POI_ID, ind, sep = "_")) %>% ungroup() %>% select(-ind) %>% - filter(!COMID %in% tmp_POIs$COMID) + filter(!COMID %in% POIs$COMID) if(nrow(hw_elev_splits) > 0) { - tmp_POIs <- POI_creation(select(hw_elev_splits, COMID, elev_class), + POIs <- POI_creation(select(hw_elev_splits, COMID, elev_class), filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + addType(., POIs, poi_name, nexus = TRUE) } } - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } cs_group <- function(a, athres) { @@ -1710,16 +1751,19 @@ cs_group <- function(a, athres) { return (result) } -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to #' @param all_nhdplus_attributes original attributes from nhdplusv2 #' @param poi_name name for poi set -create_time_of_travel_pois <- function(tmp_POIs, flowline, all_nhdplus_attributes, poi_name = "Travel") { +create_time_of_travel_pois <- function(POIs, flowline, all_nhdplus_attributes, poi_name = "Travel") { + + vpu <- POIs$vpu + POIs <- POIs$POIs # derive incremental segments from POIs inc_segs <- make_incremental_segments(flowline, filter(st_drop_geometry(flowline), - COMID %in% tmp_POIs$COMID, + COMID %in% POIs$COMID, COMID %in% flowline$COMID)) %>% # bring over VAA data inner_join(select(all_nhdplus_attributes, COMID, DnHydroseq, VA_MA, TOTMA, LENGTHKM, @@ -1749,24 +1793,27 @@ create_time_of_travel_pois <- function(tmp_POIs, flowline, all_nhdplus_attribute mutate(tt_class = paste(POI_ID, ind, sep = "_")) %>% ungroup() %>% select(-ind) %>% - filter(!COMID %in% tmp_POIs$COMID) + filter(!COMID %in% POIs$COMID) if(nrow(tt_splits) > 0) { - tmp_POIs <- POI_creation(select(tt_splits, COMID, tt_class), + POIs <- POI_creation(select(tt_splits, COMID, tt_class), filter(flowline, poi == 1), poi_name) %>% - addType(., tmp_POIs, poi_name, nexus = TRUE) + addType(., POIs, poi_name, nexus = TRUE) } - list(tmp_POIs = tmp_POIs) + list(POIs = POIs, vpu = vpu) } #' create final POIs -#' @param tmp_POIs POIs from previous step +#' @param POIs POIs from previous step #' @param flowline flowlines to attach pois to -create_final_pois <- function(tmp_POIs, flowline) { +create_final_pois <- function(POIs, flowline) { + + vpu <- POIs$vpu + POIs <- POIs$POIs - unCon_POIs <- filter(tmp_POIs, COMID %in% filter(flowline, AreaSqKM == 0)$COMID) + unCon_POIs <- filter(POIs, COMID %in% filter(flowline, AreaSqKM == 0)$COMID) xWalk <- NULL new_POIs <- NULL @@ -1774,15 +1821,15 @@ create_final_pois <- function(tmp_POIs, flowline) { # If any POIs happened to fall on flowlines w/o catchment if (nrow(unCon_POIs) > 0){ # For confluence POIs falling on Flowlines w/o catchments, derive upstream valid flowline, - poi_fix <- DS_poiFix(tmp_POIs, filter(flowline, minNet == 1)) - new_POIs <- st_compatibalize(poi_fix$new_POIs, tmp_POIs) + poi_fix <- DS_poiFix(POIs, filter(flowline, minNet == 1)) + new_POIs <- st_compatibalize(poi_fix$new_POIs, POIs) xWalk <- poi_fix$xWalk # POIs that didn't need to be moved - tmp_POIs_fixed <- filter(tmp_POIs, nexus == TRUE | + POIs_fixed <- filter(POIs, nexus == TRUE | !COMID %in% c(poi_fix$xWalk$oldPOI, poi_fix$xWalk$COMID)) # bind together - final_POIs <- bind_rows(tmp_POIs_fixed, new_POIs) %>% + final_POIs <- bind_rows(POIs_fixed, new_POIs) %>% mutate(Type_Term = ifelse(nexus == 1, NA, Type_Term)) %>% select(-dplyr::any_of(c("ID"))) @@ -1790,15 +1837,15 @@ create_final_pois <- function(tmp_POIs, flowline) { # If no fixes designate as NA poi_fix <- NA - tmp_POIs$nexus <- as.integer(tmp_POIs$nexus) + POIs$nexus <- as.integer(POIs$nexus) # if a POI will be a nexus, it can not be terminal. - final_POIs <- mutate(tmp_POIs, Type_Term = ifelse(nexus == 1, NA, Type_Term)) + final_POIs <- mutate(POIs, Type_Term = ifelse(nexus == 1, NA, Type_Term)) } - list(final_POIs = final_POIs, xwalk = xWalk, new_POIs = new_POIs, - unCon_POIs = unCon_POIs) + list(POIs = final_POIs, xwalk = xWalk, new_POIs = new_POIs, + unCon_POIs = unCon_POIs, vpu = vpu) } ### used as a cleanup step in POI workflow @@ -1949,7 +1996,11 @@ movePOI_NA_DA <- function(poi_fix, nhdDF){ #' create draft segments #' @param final_POIs POIs from previous step #' @param flowline flowlines to attach pois to -create_draft_segments <- function(final_POIs, flowline, xWalk) { +create_draft_segments <- function(POIs, flowline) { + + xWalk <- POIs$xwalk + vpu <- POIs$vpu + POIs <- POIs$POIs if("POI_ID" %in% colnames(flowline)) { flowline <- select(flowline, -POI_ID) @@ -1957,7 +2008,7 @@ create_draft_segments <- function(final_POIs, flowline, xWalk) { # Sort POIs by Levelpath and Hydrosequence in upstream to downstream order seg_POIs <- filter(st_drop_geometry(flowline), - COMID %in% final_POIs$COMID, + COMID %in% POIs$COMID, COMID %in% filter(flowline, minNet == 1)$COMID) # derive incremental segments from POIs @@ -1974,7 +2025,7 @@ create_draft_segments <- function(final_POIs, flowline, xWalk) { nsegments <- nsegments_fin$diss_segs - list(flowline_final = flowline_final, nsegments = nsegments) + list(flowline_final = flowline_final, nsegments = nsegments, vpu = vpu) } @@ -2034,14 +2085,15 @@ segment_creation <- function(nhdDF, routing_fix = NULL){ collapse_pois <- function(final_POIs, poi_dar_move, poi_distance_move, flowline) { - # number POIs - final_POIs_prec <- mutate(final_POIs, id = row_number(), moved = NA) + POIs <- final_POIs$POIs + vpu <- final_POIs$vpu - collapse <- TRUE + # number POIs + POIs <- mutate(POIs, id = row_number(), moved = NA) - moved_pois <- get_moved_pois(final_POIs_prec, poi_dar_move, poi_distance_move, flowline) + moved_pois <- get_moved_pois(POIs, poi_dar_move, poi_distance_move, flowline) - check_dups <- moved_pois$final_POIs %>% + check_dups <- moved_pois$POIs %>% group_by(COMID) %>% filter(n() > 1) @@ -2056,15 +2108,14 @@ collapse_pois <- function(final_POIs, poi_dar_move, poi_distance_move, flowline) print("All double COMIDs nexus for gage or WB splitting") } - list(final_POIs = final_POIs_prec, + list(POIs = POIs, duplicate_pois = dups, - pois_collapsed = moved_pois$pois_collapsed) + pois_collapsed = moved_pois$pois_collapsed, + vpu = vpu) } -### used when poi_move.R is sourced in POI collapse phase of workflow -### consider calling at the end of each poi creation process? #' Collapses POIs together based on criteria -#' @param (pois) sf data frame of POIs +#' @param pois sf data frame of POIs #' @param move_category (character) POI data theme to move #' @param DAR (numeric) drainage area threshold to move within #' @param dist (numeric) maximum river distance between two points to move within @@ -2314,15 +2365,19 @@ get_moved_pois <- function(final_POIs, poi_dar_move, poi_distance_move, flowline } } - list(final_POIs = final_POIs, pois_collapsed = moved_pois_table) + list(POIs = final_POIs, pois_collapsed = moved_pois_table) } -create_poi_lookup <- function(final_POIs, events, full_cats, - flowline, pois_collapsed) { +create_poi_lookup <- function(POIs, events, full_cats, + flowline) { + vpu <- POIs$vpu + + pois_collapsed <- POIs$pois_collapsed + # Final POI layer - POIs <- final_POIs %>% + POIs <- POIs$POIs %>% mutate(identifier = row_number()) # Unique POI geometry @@ -2403,8 +2458,6 @@ create_poi_lookup <- function(final_POIs, events, full_cats, # write_sf(nexi, nav_gpkg, event_geometry_table) # Load data - # full_cats <- readRDS(data_paths$fullcats_table) - lookup <- dplyr::select(sf::st_drop_geometry(flowline), NHDPlusV2_COMID = COMID, realized_catchmentID = COMID, @@ -2418,6 +2471,6 @@ create_poi_lookup <- function(final_POIs, events, full_cats, list(lookup = lookup, event_table = event_table, event_geometry_table = event_geometry_table, - pois_data = pois_data, poi_geometry = poi_geometry) + pois_data = pois_data, poi_geometry = poi_geometry, vpu = vpu) } \ No newline at end of file diff --git a/workspace/_targets/02_POI_creation_targets.R b/workspace/_targets/02_POI_creation_targets.R index b66c4956c319aa800204c92554e08417e3d2af96..1a237c94e15f714f86abe762af53e68d7b98d026 100644 --- a/workspace/_targets/02_POI_creation_targets.R +++ b/workspace/_targets/02_POI_creation_targets.R @@ -1,7 +1,7 @@ library(targets) tar_option_set(packages = c("dplyr", "sf", "hyfabric", "hydroloom", "nhdplusTools"), - memory = "transient", garbage_collection = TRUE, error = "null", + memory = "transient", garbage_collection = TRUE, error = "stop", storage = "worker", retrieval = "worker", deployment = "main") library(future) @@ -12,7 +12,6 @@ source("R/config.R") source("R/user_vars.R") source("R/utils.R") source("R/02_POI_creation_functions.R") -source("R/poi_move.R") prep_path <- ".targets_stores/01_prep/objects/" @@ -20,19 +19,22 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), tar_target(data_paths, jsonlite::read_json(data_paths_file)), tar_target(vpu_codes_file, file.path(prep_path, "vpu_codes"), format = "file"), tar_target(vpu_codes, readRDS(vpu_codes_file)), - tar_target(nav_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg"))), - tar_target(temp_gpkg, file.path("cache", paste0("nav_", vpu_codes, ".gpkg"))), + tar_target(vpu_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg")), + pattern = map(vpu_codes), deployment = "worker"), ### Base flowline -- gets updated attributes - tar_target(flowline, read_sf(nav_gpkg, nhd_flowline), - pattern = map(nav_gpkg), deployment = "worker"), + tar_target(flowline, read_sf(vpu_gpkg, nhd_flowline), + pattern = map(vpu_gpkg), deployment = "worker"), + # read_sf(data_paths$nhdplus_gdb, "NHDArea") tar_target(all_nhdplus_attributes, sf::st_drop_geometry(sf::read_sf(data_paths$nhdplus_gdb, "NHDFlowline_Network"))), + tar_target(nhdplushr_dir, data_paths$nhdplushr_dir), tar_target(full_cat_file, data_paths$fullcats_table, format = "file"), tar_target(full_cat_table, readRDS(full_cat_file)), ### huc12 pois - tar_target(huc12_poi, create_hu12_pois(read_sf(data_paths$hu12_points_path, "hu_points"), + tar_target(hu12_points_file, data_paths$hu12_points_path, format = "file"), + tar_target(huc12_poi, create_hu12_pois(read_sf(hu12_points_file, "hu_points"), flowline, poi_name = "HUC12", vpu_codes), @@ -42,9 +44,10 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), tar_target(gage_info_gpkg_file, gage_info_gpkg, format = "file"), tar_target(gages, read_sf(gage_info_gpkg_file, "Gages")), tar_target(gage_pois, create_gage_pois(gages, - flowline, "Gages", vpu_codes, huc12_poi$tmp_POIs, + flowline, "Gages", huc12_poi, min_da_km_gages, combine_meters, reach_meas_thresh), - pattern = map(flowline, huc12_poi, vpu_codes), deployment = "worker"), + pattern = map(flowline, huc12_poi), deployment = "worker"), + tar_target(gage_events, gage_pois$events, pattern = map(gage_pois), deployment = "worker"), ### Thermo electric power plant POIs tar_target(te_points_file, data_paths$TE_points_path, format = "file"), @@ -52,47 +55,52 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), tar_target(te_attributes_file, file.path(data_paths$TE_points_path, "1_model_inputs/te_plants.csv"), format = "file"), tar_target(te_attributes, read.csv(te_attributes_file, colClasses = "character")), tar_target(te_pois, create_thermo_electric_pois(te_points, te_attributes, - gage_pois$tmp_POIs, huc12_poi$HUC12_COMIDs, - flowline, "TE", vpu_codes), - pattern = map(gage_pois, huc12_poi, flowline, vpu_codes)), + gage_pois, huc12_poi$HUC12_COMIDs, + flowline, "TE"), + pattern = map(gage_pois, huc12_poi, flowline)), ### get waterbodies in order tar_target(combined_waterbodies_file, file.path(prep_path, "combined_waterbodies"), format = "file"), tar_target(combined_waterbodies, readRDS(combined_waterbodies_file)), + tar_target(nhdplus_area, filter(combined_waterbodies, layer == "NHDArea")), tar_target(waterbodies, make_waterbodies_layer(combined_waterbodies, flowline, geo_crs), pattern = map(flowline), deployment = "worker"), ### star fit cross walk and reservoir POIs tar_target(istarf_xwalk_file, data_paths$istarf_xwalk, format = "file"), tar_target(istarf_xwalk, read.csv(istarf_xwalk_file)), - tar_target(resops_pois, create_resops_pois(te_pois$tmp_POIs, waterbodies, istarf_xwalk, + tar_target(resops_pois, create_resops_pois(te_pois, waterbodies, istarf_xwalk, flowline, "resops"), pattern = map(te_pois, waterbodies, flowline), deployment = "worker"), + tar_target(resops_waterbody_attributes, resops_pois$wb_resops, + pattern = map(resops_pois), deployment = "worker"), + ### hilarri is more reservoir POIs tar_target(hilarri_file, data_paths$hilarri_sites, format = "file"), tar_target(hilarri_data, read.csv(hilarri_file, colClasses = "character")), - tar_target(hilarri_pois, create_hilarri_pois(resops_pois$tmp_POIs, + tar_target(hilarri_pois, create_hilarri_pois(resops_pois, hilarri_data, resops_pois$wb_pois, flowline, "hilarri"), pattern = map(resops_pois, flowline), deployment = "worker"), + tar_target(hilarri_waterbodies, hilarri_pois$wb_pois, + pattern = map(hilarri_pois), deployment = "worker"), ### waterbody outlet POIs - # TODO: work on cleaning up parameters of this function - # TODO: track event creation explicitly in workflow - tar_target(wb_outlet_pois, create_wb_outlet_pois(hilarri_pois$tmp_POIs, - filter(combined_waterbodies, layer == "NHDArea"), - hilarri_pois$wb_pois, - resops_pois$wb_resops, + tar_target(wb_outlet_pois, create_wb_outlet_pois(hilarri_pois, + nhdplus_area, + hilarri_waterbodies, + resops_waterbody_attributes, flowline, - gage_pois$events, - nav_gpkg, - data_paths, + gage_events, proj_crs, gages, - "wb_id"), - pattern = map(hilarri_pois, resops_pois, gage_pois,flowline, nav_gpkg), - deployment = "worker"), + "wb_id", + nhdplushr_dir), + pattern = map(hilarri_pois, hilarri_waterbodies, resops_waterbody_attributes, + gage_events,flowline), deployment = "worker"), + tar_target(gage_and_waterbody_outlet_events, wb_outlet_pois$all_events, + pattern = map(wb_outlet_pois), deployment = "worker"), ### update flowlines for clarity tar_target(updated_flowline_wb_outlet, wb_outlet_pois$updated_flowline, @@ -101,18 +109,18 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), ### Addition and Removal events from Brakebill, 2019 tar_target(ar_data, readxl::read_xlsx(file.path(data_paths$USGS_IT_path, "ARevents_final7.xlsx"))), - tar_target(ar_event_pois, create_ar_event_pois(wb_outlet_pois$tmp_POIs, ar_data, + tar_target(ar_event_pois, create_ar_event_pois(wb_outlet_pois, ar_data, updated_flowline_wb_outlet, "AR"), pattern = map(wb_outlet_pois, updated_flowline_wb_outlet), deployment = "worker"), ### terminal POIs at network ends - tar_target(terminal_pois, create_terminal_pois(ar_event_pois$tmp_POIs, + tar_target(terminal_pois, create_terminal_pois(ar_event_pois, updated_flowline_wb_outlet, min_da_km_terminal, "Term"), pattern = map(ar_event_pois, updated_flowline_wb_outlet), deployment = "worker"), ### confluence POIs - tar_target(confluence_pois, create_confluence_pois(terminal_pois$tmp_POIs, + tar_target(confluence_pois, create_confluence_pois(terminal_pois, updated_flowline_wb_outlet, "Conf"), pattern = map(terminal_pois, updated_flowline_wb_outlet), deployment = "worker"), @@ -121,53 +129,55 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), pattern = map(confluence_pois), deployment = "worker"), ## waterbody inlet POIs - tar_target(wb_inlet_pois, create_wb_inlet_pois(confluence_pois$tmp_POIs, + tar_target(wb_inlet_pois, create_wb_inlet_pois(confluence_pois, wb_outlet_pois$WB_VPU_pois, - wb_outlet_pois$all_events, + gage_and_waterbody_outlet_events, updated_flowline_confluence, gages, - data_paths, proj_crs, "WBIn"), - pattern = map(confluence_pois, wb_outlet_pois, updated_flowline_confluence), deployment = "worker"), + nhdplus_area, proj_crs, "WBIn", nhdplushr_dir), + pattern = map(confluence_pois, wb_outlet_pois, gage_and_waterbody_outlet_events, + updated_flowline_confluence), deployment = "worker"), + tar_target(gage_and_all_waterbody_events, wb_inlet_pois$events, + pattern = map(wb_inlet_pois), deployment = "worker"), tar_target(nid_points, read_sf(data_paths$NID_points_path, "Final_NID_2018")), - tar_target(nid_pois, create_nid_pois(wb_inlet_pois$tmp_POIs, + tar_target(nid_pois, create_nid_pois(wb_inlet_pois, nid_points, updated_flowline_confluence, "NID"), pattern = map(wb_inlet_pois, updated_flowline_confluence)), - tar_target(headwater_pois, create_headwater_pois(nid_pois$tmp_POIs, + tar_target(headwater_pois, create_headwater_pois(nid_pois, updated_flowline_confluence, all_nhdplus_attributes, min_da_km_hw, "DA"), pattern = map(nid_pois, updated_flowline_confluence), deployment = "worker"), - tar_target(elevation_break_pois, create_elevation_break_pois(headwater_pois$tmp_POIs, + tar_target(elevation_break_pois, create_elevation_break_pois(headwater_pois, updated_flowline_confluence, all_nhdplus_attributes, "elev"), pattern = map(headwater_pois, updated_flowline_confluence), deployment = "worker"), - tar_target(time_of_travel_pois, create_time_of_travel_pois(elevation_break_pois$tmp_POIs, + tar_target(time_of_travel_pois, create_time_of_travel_pois(elevation_break_pois, updated_flowline_confluence, all_nhdplus_attributes, "Travel"), pattern = map(elevation_break_pois, updated_flowline_confluence), deployment = "worker"), - tar_target(final_pois, create_final_pois(time_of_travel_pois$tmp_POIs, + tar_target(final_pois, create_final_pois(time_of_travel_pois, updated_flowline_confluence), pattern = map(time_of_travel_pois, updated_flowline_confluence), deployment = "worker"), - tar_target(draft_segments, create_draft_segments(final_pois$final_POIs, - updated_flowline_confluence, - final_pois$xwalk), + tar_target(draft_segments, create_draft_segments(final_pois, + updated_flowline_confluence), pattern = map(final_pois, updated_flowline_confluence), deployment = "worker"), - tar_target(collapsed_pois, collapse_pois(final_pois$final_POIs, poi_dar_move, poi_distance_move, + tar_target(collapsed_pois, collapse_pois(final_pois, poi_dar_move, poi_distance_move, updated_flowline_confluence), pattern = map(final_pois, updated_flowline_confluence), deployment = "worker"), tar_target(poi_lookup, - create_poi_lookup(collapsed_pois$final_POIs, wb_inlet_pois$events, - full_cat_table, updated_flowline_confluence, collapsed_pois$pois_collapsed), - pattern = map(collapsed_pois, wb_inlet_pois, updated_flowline_confluence)) + create_poi_lookup(collapsed_pois, gage_and_all_waterbody_events, + full_cat_table, updated_flowline_confluence), + pattern = map(collapsed_pois, gage_and_all_waterbody_events, updated_flowline_confluence)) ) \ No newline at end of file diff --git a/workspace/workflow_runner.R b/workspace/workflow_runner.R index 9fd575782d61558c8867925b7bdcc82afc7485ed..e949d11251fa9d1036fdc5b3064f899fc97cbe5c 100644 --- a/workspace/workflow_runner.R +++ b/workspace/workflow_runner.R @@ -38,7 +38,7 @@ if(FALSE) { # this won't run if you just bang through this file tar_make(callr_function = NULL) } -workers <- 8 +workers <- 12 # run branches for a given target in parallel if you have enough memory # note this will only work for targets with 'deployment = "worker"' @@ -69,3 +69,4 @@ tar_make_future(final_pois, workers = workers) tar_make_future(draft_segments, workers = workers) tar_make_future(collapsed_pois, workers = workers) tar_make_future(poi_lookup, workers = workers) +tar_make_future(draft_segments, workers = workers)