From 8d98cf2755fa73e3fcde3ff6e1230f20915b9887 Mon Sep 17 00:00:00 2001 From: David Blodgett <dblodgett@usgs.gov> Date: Tue, 3 Dec 2024 10:17:51 -0600 Subject: [PATCH] AR, Terminal, Confluence, and waterbody inlet POIs --- workspace/R/02_POI_creation_functions.R | 430 ++++++++++++++++++- workspace/R/NHD_navigate.R | 308 ------------- workspace/_targets/02_POI_creation_targets.R | 58 ++- workspace/workflow_runner.R | 11 +- 4 files changed, 484 insertions(+), 323 deletions(-) diff --git a/workspace/R/02_POI_creation_functions.R b/workspace/R/02_POI_creation_functions.R index 5de2321..c143897 100644 --- a/workspace/R/02_POI_creation_functions.R +++ b/workspace/R/02_POI_creation_functions.R @@ -507,14 +507,16 @@ create_wb_outlet_pois <- function(tmp_POIs, all_events = events, wb_events = wb_events, double_wb = double_wb, double_wb_pois = double_wb_pois, wb_missing_poi = wb_missing_poi, poi_missing_wb = poi_missing_wb, - updated_flowline = wb_layers$nhd_wb_table) + updated_flowline = wb_layers$nhd_wb_table, WB_VPU_pois = WB_VPU_pois) } #' Creates Waterbody POIs, calls a few other functions #' @param nhd (sf data.frame) flowline data.frame -#' @param WBs_VPU (sf data.frame) +#' @param wb_table (sf data.frame) #' @param data_paths (list) data paths to data layers #' @param crs (integer) CRS to use (epsg code) +#' @param wb_poi_lst TODO +#' @param poi_name name for pois to be created #' #' @return (sf data.frame) dataframe of waterbody outlet POIs #' @@ -590,10 +592,10 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l addType(., tmp_POIs, "wb_id", nexus = TRUE) %>% mutate(Type_WBOut = ifelse(is.na(Type_WBOut), Type_wb_id, Type_WBOut)) %>% select(-Type_wb_id) %>% - left_join(select(st_drop_geometry(resops_main), GRAND_ID, resops_FlowLcomid), - by = c("COMID" = "resops_FlowLcomid")) %>% - mutate(Type_resops = ifelse(!is.na(GRAND_ID), GRAND_ID, Type_resops)) %>% - select(-GRAND_ID) + left_join(select(st_drop_geometry(resops_main), grand_id, resops_flowlcomid), + by = c("COMID" = "resops_flowlcomid")) %>% + mutate(Type_resops = ifelse(!is.na(grand_id), grand_id, Type_resops)) %>% + select(-grand_id) } else { # WBarea out comids wbareaout_coms <- select(st_drop_geometry(nhd), COMID, Hydroseq) %>% @@ -689,6 +691,165 @@ wbout_POI_creaton <- function(nhd, wb_table, data_paths, tmp_POIs, crs, wb_poi_l } } +#' Creates Waterbody POIs, calls a few other functions +#' @param nhd (sf data.frame) flowline data.frame +#' @param wb_lyr (sf data.frame) waterbodies to create inlet pois from +#' @param events (sf data.frame) of events that waterbody inlets can be pulled from +#' @param data_paths (list) list of data paths +#' @param crs (integer) CRS to use (epsg code) +#' @param tmp_POIs (sf data.frame) rolling POI data frame +#' +#' @return (sf data.frame) dataframe of WB inlet POIs +#' +wbin_POIcreation <- function(flowline, wb_lyr, events, data_paths, crs, + tmp_POIs, poi_name = "WBIn"){ + + wbout_COMIDs <- filter(tmp_POIs, !is.na(Type_WBOut)) + WBs_in_POIs <- filter(wb_lyr, wb_id %in% wbout_COMIDs$Type_WBOut) + + # Unnest to get list of waterbody COMIDS (member_comid) + wb_table <- st_drop_geometry(wb_lyr) %>% + dplyr::mutate(member_comid = strsplit(member_comid, ",")) %>% + tidyr::unnest(cols = member_comid) %>% + mutate(member_comid = as.integer(member_comid)) %>% + distinct() + + # # Attribute flowline if waterbody exists for POIs + # nhd_wb <- nhd %>% + # left_join(distinct(wb_table, wb_id, member_comid), + # by = c("WBAREACOMI" = "member_comid")) + + # Create waterbody inlet POIs + wbin_COMIDs <- filter(flowline, is.na(wb_id) & + DnHydroseq %in% filter(flowline, !is.na(wb_id))$Hydroseq) %>% + select(-wb_id) %>% + switchDiv(., flowline) %>% + inner_join(st_drop_geometry(filter(flowline, minNet == 1)) %>% + select(wb_id, Hydroseq), by = c("DnHydroseq" = "Hydroseq")) %>% + select(COMID, wb_id, minNet) %>% + group_by(COMID) %>% + # Ensure the inlets are on the network defined by confluence POIs + filter(minNet == 1) + + # Headwater Waterbodies that may need the network extended through the inlet + need_wbin <- st_drop_geometry(wb_lyr) %>% + #dplyr::select(COMID)%>% + dplyr::filter(!wb_id %in% wbin_COMIDs$wb_id) + + if(nrow(need_wbin)>0){ + + nhd_inlet <- mutate(flowline, WB = ifelse(wb_id > 0 & wb_id %in% need_wbin$wb_id, 1, 0)) + + missing_wbin_COMIDs <- filter(nhd_inlet, WB == 0, + DnHydroseq %in% filter(nhd_inlet, WB == 1)$Hydroseq) %>% + select(-wb_id) %>% + switchDiv(., nhd_inlet) %>% + inner_join(st_drop_geometry(filter(nhd_inlet, minNet == 1)) %>% + select(Hydroseq, wb_id), + by = c("DnHydroseq" = "Hydroseq")) %>% + select(COMID, wb_id) %>% + group_by(COMID) %>% + filter(row_number() == 1) %>% + ungroup() + + missing_wbin_COMIDs <- missing_wbin_COMIDs %>% + left_join(select(st_drop_geometry(nhd_inlet), COMID, TotDASqKM) + , by = c("COMID")) %>% + group_by(COMID) %>% + slice(which.max(TotDASqKM))%>% + ungroup() %>% + select(-TotDASqKM) + + if(nrow(missing_wbin_COMIDs) > 0){ + wbin_COMIDs <- data.table::rbindlist(list(wbin_COMIDs, + missing_wbin_COMIDs), + fill = TRUE) %>% + select(COMID, wb_id) + } + } + + wbin_POIs <- POI_creation(filter(st_drop_geometry(wbin_COMIDs), !COMID %in% wbout_COMIDs$COMID), + flowline, "WBIn") + + # Get NHDArea and HR waterbodies + WBs_VPU_areaHR <- HR_Area_coms(flowline, + filter(wb_lyr, source == "HR ID AVAILABLE"), + data_paths, crs) + + if(is.list(WBs_VPU_areaHR)) { + nhd_WBs <- filter(flowline, minNet == 1, + COMID %in% WBs_VPU_areaHR$nhd_wb_table$comid, + AreaSqKM > 0) + } else { + nhd_WBs <- NA + } + + if(is.data.frame(nhd_WBs)){ + + # Get the outlet events again + wb_outlet_events <- events %>% + filter(event_type == "outlet") + + # Drive the inlet events + nhd_inlet <- filter(nhd_WBs, !COMID %in% wb_outlet_events$COMID) + + if("12" %in% unique(flowline$VPUID)){ + nhd_inlet <- rbind(nhd_inlet, filter(nhd_WBs, COMID == 1304709)) + } + + if (nrow(nhd_inlet) > 0){ + # Get inlet events and bind with outlets + wb_inlet_events <- WB_event(WBs_VPU_areaHR, nhd_inlet, "inlet") %>% + mutate(nexus = TRUE) %>% + inner_join(select(st_drop_geometry(flowline), COMID, Hydroseq, ToMeas), by = "COMID") + + # Determine which events are close enough to the end of an upstream flowline to just use that geometry for the POI + wbin_fl_upstream <- filter(flowline, DnHydroseq %in% filter(flowline, COMID %in% wb_inlet_events$COMID)$Hydroseq) %>% + group_by(DnHydroseq) %>% + filter(n() == 1) %>% + ungroup() + + if(nrow(wbin_fl_upstream) > 0){ + + wb_inlet_POIs <- filter(wb_inlet_events, REACH_meas < 5 | as.integer(REACH_meas) == as.integer(ToMeas), + COMID %in% wbin_fl_upstream$toCOMID) %>% + inner_join(select(st_drop_geometry(wbin_fl_upstream), usCOMID = COMID, toCOMID), by = c("COMID" = "toCOMID")) %>% + mutate(dsCOMID = COMID, COMID = usCOMID) + + if(nrow(wb_inlet_POIs) > 0) { + wbin_POIs <- bind_rows(wbin_POIs, POI_creation(select(st_drop_geometry(wb_inlet_POIs), dsCOMID, + Type_WBIn = POI_identifier), + flowline, "WBIn")) + + wb_inlet_events <- filter(wb_inlet_events, !COMID %in% wb_inlet_POIs$dsCOMID) + } + } + + wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn", nexus = TRUE) + + if(nrow(wb_inlet_events) > 0){ + + wb_inlet_events <- st_compatibalize(wb_inlet_events, wbin_POIs) + wbin_POIs <- data.table::rbindlist(list(wbin_POIs, + select(wb_inlet_events, COMID, + Type_WBIn = POI_identifier, nexus)), + fill = TRUE) %>% + st_as_sf() + } + + return(list(POIs = wbin_POIs, events = wb_inlet_events)) + } else { + print("no waterbody inlets events") + wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") + return(list(POIs = wbin_POIs, events = NA)) + } + } else { + wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") + return(list(POIs = wbin_POIs, events = NA)) + } + +} + ### USEDIN wbout_POI_creaton and wbin_POIcreation #' Retrieves waterbodies from NHDArea layer or NHD Hi-Res for ResOpsUs #' locations if absent from NHD waterbody @@ -1128,3 +1289,260 @@ wb_poi_collapse <- function(tmp_POIs, nhd, events, gages){ return(list(POIs = tmp_POIs_fin, events_ret = events)) } + +#' create abstraction(?) and removal event points of interest +#' @param ar_data abstraction and removal data from USGS data release +#' @param flowline flowlines to attach pois to +#' @param poi_name name for poi set +create_ar_event_pois <- function(tmp_POIs, ar_data, flowline, poi_name = "AR") { + + ar_events <- ar_data %>% + filter(fromComid > 0 | ToComid > 0) %>% + select(COMID = NHDComid, AREventID) %>% + filter(COMID %in% flowline$COMID) + + if(nrow(ar_events) > 0){ + # AR POIs + tmp_POIs <- POI_creation(ar_events, flowline, poi_name) %>% + addType(., tmp_POIs, "AR", nexus = TRUE) + + } + + list(tmp_POIs = tmp_POIs) +} + +#' create terminal pois +#' @param tmp_POIs POIs from previous step +#' @param flowline flowlines to attach pois to +#' @param min_da_km_terminal minimum drainage area to include a network +#' @param poi_name name for poi set +create_terminal_pois <- function(tmp_POIs, flowline, min_da_km_terminal, poi_name = "Term") { + + term_paths <- filter(st_drop_geometry(flowline), + Hydroseq %in% filter(flowline, COMID %in% tmp_POIs$COMID)$TerminalPa) + + # Non-POI levelpath terminal pois, but meet size criteria + terminal_POIs <- st_drop_geometry(flowline) %>% + filter(Hydroseq == TerminalPa | (toCOMID == 0 | is.na(toCOMID) | !toCOMID %in% COMID)) %>% + filter(!COMID %in% term_paths$COMID, TotDASqKM >= min_da_km_terminal) %>% + bind_rows(term_paths) %>% + # Use level path identifier as Type ID + select(COMID, LevelPathI) + + tmp_POIs <- POI_creation(terminal_POIs, flowline, poi_name) %>% + addType(., tmp_POIs, poi_name, nexus = TRUE) + + list(tmp_POIs = tmp_POIs) +} + +#' create confluence pois +#' @param tmp_POIs POIs from previous step +#' @param flowline flowlines to attach pois to +#' @param poi_name name for poi set +create_confluence_pois <- function(tmp_POIs, flowline, poi_name = "Conf") { + # Navigate upstream from each POI and determine minimally-sufficient network between current POI sets + up_net <- unique(unlist(lapply(unique(tmp_POIs$COMID), + \(x, nhdDF) get_UM(nhdDF, x), + nhdDF = flowline))) + + finalNet <- unique(NetworkConnection(up_net, st_drop_geometry(flowline))) + + # Subset NHDPlusV2 flowlines to navigation results and write to shapefile + flowline <- mutate(flowline, minNet = ifelse(COMID %in% finalNet, 1, 0)) + + # Create new confluence POIs + conf_COMIDs <- st_drop_geometry(filter(flowline, minNet == 1)) %>% + # Downstream hydrosequence of 0 indicates + # the flowline is terminating or + # leaving the domain, so they + # are excluded from this process + filter(DnHydroseq > 0) %>% + group_by(DnHydroseq) %>% + filter(n()> 1) %>% + mutate(Type_Conf = LevelPathI) %>% + ungroup() %>% + select(COMID, Type_Conf) + + tmp_POIs <- POI_creation(conf_COMIDs, filter(flowline, minNet == 1), poi_name) %>% + addType(., tmp_POIs, poi_name, nexus = TRUE) + + list(tmp_POIs = tmp_POIs, updated_flowline = flowline) +} + +#' create waterbody inlet pois +#' @param tmp_POIs POIs from previous step +#' @param wb_lyr waterbodies to create inlet pois from +#' @param events complete layer of events prior to creating waterbody inlets +#' @param flowline flowlines to attach pois to +#' @param nav_gpkg name of gpkg for current vpu +#' @param data_paths list of data sources +#' @param proj_crs projected crs for overall project +#' @param poi_name name for poi set +create_wb_inlet_pois <- function(tmp_POIs, wb_lyr, events, + flowline, gages, data_paths, + proj_crs, poi_name = "WBIn") { + + wb_layers <- wbin_POIcreation(filter(flowline, minNet == 1), wb_lyr, events, + data_paths, proj_crs, tmp_POIs, + poi_name) + + wb_in_col <- wb_inlet_collapse(wb_layers$POIs, flowline, events, gages) + + wb_inlet_events <- NULL + + if(!all(is.na(wb_layers$events))) { + wb_inlet_events <- wb_layers$events %>% + select(COMID, REACHCODE, REACH_meas, POI_identifier, + event_type, nexus) + } + + list(tmp_POIs = wb_in_col$POIs, wb_inlet_events = wb_inlet_events) +} + +### used as part of waterbody poi creation in the workflow +#' collapses close upstream gages with wb inlet events. +#' @param temp_POIs (sf data.frame) rolling POI data frame +#' @param nhd (sf data.frame) nhd flowlines +#' @param events (list) waterbody inlet events +#' +#' @return (sf data.frame) dataframe of gage POIs +#' +wb_inlet_collapse <- function(tmp_POIs, nhd, events, gages){ + gage_dist_node <- function(x, wb_ds_ds, gage_add, events){ + # print (x) + wb_in_fl <- filter(wb_ds_ds, COMID == x) + gage_ds <- filter(wb_ds_ds, Hydroseq %in% wb_in_fl$Hydroseq | + Hydroseq %in% wb_in_fl$DnHydroseq) %>% + filter(TotDASqKM == max(TotDASqKM)) + + gage_reach <- gage_ds %>% + group_by(REACHCODE) %>% + summarize(do_union = TRUE, + total_length = sum(LENGTHKM)) + + #print(nrow(gage_reach)) + gage_event <- filter(gage_add, COMID %in% gage_ds$COMID) + wb_event <- filter(events, COMID == x, event_type == "inlet") %>% + unique() + + if(nrow(gage_reach) == 0){ + print("no gage reaches") + } + + if(nrow(gage_event) == 0){ + return("No gage events") + } else if(gage_event$COMID != wb_in_fl$COMID) { + gage_reach <- gage_reach %>% + filter(REACHCODE == gage_event$reachcode) %>% + mutate(gage_dist = ifelse(gage_event$nexus == TRUE, + total_length * (1 - (gage_event$reach_meas/100)), + total_length)) %>% + mutate(gage_comid = gage_event$COMID, + wbin_comid = x) + } else if(gage_event$COMID == wb_in_fl$COMID){ + if(nrow(wb_event) >0){ + wb_in_meas <- wb_event$REACH_meas + wb_RC <- wb_event$REACHCODE + } else { + wb_out_meas <- wb_in_fl$FromMeas + wb_RC <- wb_in_fl$REACHCODE + } + + # gage reach + gage_reach <- gage_reach %>% + filter(REACHCODE == gage_event$reachcode) %>% + mutate(gage_dist = total_length * (1 - (gage_event$reach_meas/100))) + + # wb info + wb_reach <- gage_reach %>% + filter(REACHCODE == wb_RC) %>% + mutate(wb_dist = total_length * (1 - (wb_out_meas /100))) + + gage_reach <- gage_reach %>% + mutate(gage_dist = abs(wb_reach$wb_dist - gage_dist), + gage_comid = gage_event$COMID, + wbin_comid = x) + } + } + + #events <- read_sf(temp_gpkg, split_layer) %>% + # rbind(st_compatibalize(wb_,.)) + + # Previously identified streamgages within Gage_Selection.Rmd + streamgages_VPU <- gages %>% + rename(COMID = comid) %>% + filter(COMID %in% nhd$COMID) %>% + #st_drop_geometry() %>% + switchDiv(., nhd) + + # get waterbody outlets + wb_in <- filter(tmp_POIs, !is.na(Type_WBIn), is.na(Type_Gages)) + + wb_in_nexus <- filter(wb_in, nexus == TRUE) + wb_in_nhd <- filter(nhd, COMID %in% wb_in$COMID) + wb_ds_ds <- wb_in_nhd %>% + rbind(filter(nhd, LevelPathI %in% .$LevelPathI & DnHydroseq %in% .$Hydroseq)) %>% + distinct() + + # get gages + gage_wb <- filter(tmp_POIs, !is.na(Type_Gages) & is.na(Type_WBIn)) #%>% + # filter(COMID %in% wb_ds_ds$COMID) + #gage_nhd <- filter(nhd, COMID %in% gage_wb$COMID) + #gage_node <- filter(gage_wb, nexus == FALSE) + #gage_nexus <- filter(gage_wb, nexus == TRUE) + + gage_add <- filter(streamgages_VPU, site_no %in% gage_wb$Type_Gages) %>% + select(COMID, reachcode, reach_meas, site_no) %>% + inner_join(select(st_drop_geometry(gage_wb), site_no = Type_Gages, nexus), + by = "site_no") %>% + filter(!nexus) + + output <- lapply(wb_in$COMID, + gage_dist_node, wb_ds_ds, gage_add, events) + + output_full <- do.call("rbind", output[lengths(output) > 1]) + + if(!is.null(output_full)){ + output_full <- output_full %>% + filter(gage_dist < 1) %>% + left_join(select(st_drop_geometry(nhd), COMID, TotDASqKM_WB = TotDASqKM), + by = c("wbin_comid" = "COMID")) %>% + left_join(select(st_drop_geometry(nhd), COMID, TotDASqKM_gage = TotDASqKM), + by = c("gage_comid" = "COMID")) %>% + mutate(DAR = TotDASqKM_gage / TotDASqKM_WB) %>% + filter(gage_dist < 1, DAR > .975) %>% + st_drop_geometry() + + gage_POI <- filter(tmp_POIs, COMID %in% output_full$gage_comid) %>% + select(COMID, Type_HUC12_ds = Type_HUC12, Type_Gages_ds = Type_Gages, + Type_TE_ds = Type_TE, Type_Term_ds = Type_Term, nexus) %>% + st_drop_geometry() %>% + group_by(COMID) %>% + summarise(Type_HUC12_ds = last(na.omit(Type_HUC12_ds)), + Type_Gages_ds = last(na.omit(Type_Gages_ds)), + Type_Term_ds = last(na.omit(Type_Term_ds)), + Type_TE_ds = last(na.omit(Type_TE_ds)), + nexus = last(na.omit(nexus))) + + WB_POI <- filter(tmp_POIs, COMID %in% output_full$wbin_comid, !is.na(Type_WBIn)) %>% + left_join(select(output_full, gage_comid, wbin_comid), by = c("COMID" = "wbin_comid")) %>% + left_join(select(gage_POI, -nexus), by = c("gage_comid" = "COMID")) %>% + mutate(Type_HUC12 = ifelse(!is.na(Type_HUC12_ds), Type_HUC12_ds, Type_HUC12), + Type_Gages = ifelse(!is.na(Type_Gages_ds), Type_Gages_ds, Type_Gages), + Type_TE = ifelse(!is.na(Type_TE_ds), Type_TE_ds, Type_TE), + Type_Term = ifelse(!is.na(Type_Term_ds), Type_Term_ds, Type_Term)) %>% + select(-c(Type_HUC12_ds, Type_Gages_ds, Type_TE_ds, Type_Term_ds)) + + tmp_POIs_fin <- filter(tmp_POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% + rbind(select(WB_POI, -gage_comid)) + + if(any(gage_POI$nexus == TRUE)){ + gage_events <- filter(gage_POI, nexus == TRUE) + events <- filter(events, !POI_identifier %in% gage_events$Type_Gages_ds) + } + return(list(POIs = tmp_POIs_fin, events_ret = events)) + } else { + print ("no points collapse") + return(list(POIs = tmp_POIs, events_ret = NA)) + } +} diff --git a/workspace/R/NHD_navigate.R b/workspace/R/NHD_navigate.R index b00ffa9..ec76a7d 100644 --- a/workspace/R/NHD_navigate.R +++ b/workspace/R/NHD_navigate.R @@ -508,314 +508,6 @@ split_elev <- function(in_POI_ID, split_DF, elev_diff, max_DA){ return(split_elev_DF) } - - -#' Creates Waterbody POIs, calls a few other functions -#' @param nhd (sf data.frame) flowline data.frame -#' @param WBs_VPU (sf data.frame) waterbodies to create inlet pois from -#' @param data_paths (list) list of data paths -#' @param crs (integer) CRS to use (epsg code) -#' @param split_layer (sf data.frame) events to split flowlines with -#' @param tmp_POIs (sf data.frame) rolling POI data frame -#' -#' @return (sf data.frame) dataframe of WB inlet POIs -#' -wbin_POIcreation <- function(nhd, wb_lyr, data_paths, crs, - split_layer, tmp_POIs){ - - wbout_COMIDs <- filter(tmp_POIs, !is.na(Type_WBOut)) - WBs_in_POIs <- filter(wb_lyr, wb_id %in% wbout_COMIDs$Type_WBOut) - - # Unnest to get list of waterbody COMIDS (member_comid) - wb_table <- st_drop_geometry(wb_lyr) %>% - dplyr::mutate(member_comid = strsplit(member_comid, ",")) %>% - tidyr::unnest(cols = member_comid) %>% - mutate(member_comid = as.integer(member_comid)) %>% - distinct() - - # # Attribute flowline if waterbody exists for POIs - # nhd_wb <- nhd %>% - # left_join(distinct(wb_table, wb_id, member_comid), - # by = c("WBAREACOMI" = "member_comid")) - - # Create waterbody inlet POIs - wbin_COMIDs <- filter(nhd, is.na(wb_id) & - DnHydroseq %in% filter(nhd, !is.na(wb_id))$Hydroseq) %>% - select(-wb_id) %>% - switchDiv(., nhd) %>% - inner_join(st_drop_geometry(filter(nhd, minNet == 1)) %>% - select(wb_id, Hydroseq), by = c("DnHydroseq" = "Hydroseq")) %>% - select(COMID, wb_id, minNet) %>% - group_by(COMID) %>% - # Ensure the inlets are on the network defined by confluence POIs - filter(minNet == 1) - - # Headwater Waterbodies that may need the network extended through the inlet - need_wbin <- st_drop_geometry(wb_lyr) %>% - #dplyr::select(COMID)%>% - dplyr::filter(!wb_id %in% wbin_COMIDs$wb_id) - - if(nrow(need_wbin)>0){ - - nhd_inlet <- mutate(nhd, WB = ifelse(wb_id > 0 & wb_id %in% need_wbin$wb_id, 1, 0)) - - missing_wbin_COMIDs <- filter(nhd_inlet, WB == 0, - DnHydroseq %in% filter(nhd_inlet, WB == 1)$Hydroseq) %>% - select(-wb_id) %>% - switchDiv(., nhd_inlet) %>% - inner_join(st_drop_geometry(filter(nhd_inlet, minNet == 1)) %>% - select(Hydroseq, wb_id), - by = c("DnHydroseq" = "Hydroseq")) %>% - select(COMID, wb_id) %>% - group_by(COMID) %>% - filter(row_number() == 1) %>% - ungroup() - - missing_wbin_COMIDs <- missing_wbin_COMIDs %>% - left_join(select(st_drop_geometry(nhd_inlet), COMID, TotDASqKM) - , by = c("COMID")) %>% - group_by(COMID) %>% - slice(which.max(TotDASqKM))%>% - ungroup() %>% - select(-TotDASqKM) - - if(nrow(missing_wbin_COMIDs) > 0){ - wbin_COMIDs <- data.table::rbindlist(list(wbin_COMIDs, - missing_wbin_COMIDs), - fill = TRUE) %>% - select(COMID, wb_id) - } - } - - wbin_POIs <- POI_creation(filter(st_drop_geometry(wbin_COMIDs), !COMID %in% wbout_COMIDs$COMID), - nhd, "WBIn") - - # Get NHDArea and HR waterbodies - WBs_VPU_areaHR <- HR_Area_coms(nhd, - filter(wb_lyr, source == "HR ID AVAILABLE"), - data_paths, crs) - - if(is.list(WBs_VPU_areaHR)) { - nhd_WBs <- filter(nhd, minNet == 1, - COMID %in% WBs_VPU_areaHR$nhd_wb_table$comid, - AreaSqKM > 0) - } else { - nhd_WBs <- NA - } - - if(is.data.frame(nhd_WBs)){ - - # Get the outlet events again - wb_outlet_events <- read_sf(temp_gpkg, split_layer) %>% - filter(event_type == "outlet") - - # Drive the inlet events - nhd_inlet <- filter(nhd_WBs, !COMID %in% wb_outlet_events$COMID) - - if("12" %in% unique(nhd$VPUID)){ - nhd_inlet <- rbind(nhd_inlet, filter(nhd_WBs, COMID == 1304709)) - } - - if (nrow(nhd_inlet) > 0){ - # Get inlet events and bind with outlets - wb_inlet_events <- WB_event(WBs_VPU_areaHR, nhd_inlet, "inlet") %>% - mutate(nexus = TRUE) %>% - inner_join(select(st_drop_geometry(nhd), COMID, Hydroseq, ToMeas), by = "COMID") - - # Determine which events are close enough to the end of an upstream flowline to just use that geometry for the POI - wbin_fl_upstream <- filter(nhd, DnHydroseq %in% filter(nhd, COMID %in% wb_inlet_events$COMID)$Hydroseq) %>% - group_by(DnHydroseq) %>% - filter(n() == 1) %>% - ungroup() - - if(nrow(wbin_fl_upstream) > 0){ - - wb_inlet_POIs <- filter(wb_inlet_events, REACH_meas < 5 | as.integer(REACH_meas) == as.integer(ToMeas), - COMID %in% wbin_fl_upstream$toCOMID) %>% - inner_join(select(st_drop_geometry(wbin_fl_upstream), usCOMID = COMID, toCOMID), by = c("COMID" = "toCOMID")) %>% - mutate(dsCOMID = COMID, COMID = usCOMID) - - if(nrow(wb_inlet_POIs) > 0) { - wbin_POIs <- bind_rows(wbin_POIs, POI_creation(select(st_drop_geometry(wb_inlet_POIs), dsCOMID, - Type_WBIn = POI_identifier), - nhd, "WBIn")) - - wb_inlet_events <- filter(wb_inlet_events, !COMID %in% wb_inlet_POIs$dsCOMID) - } - } - - if(nrow(wb_inlet_events) > 0){ - - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn", nexus = TRUE) - wb_inlet_events <- st_compatibalize(wb_inlet_events, wbin_POIs) - wbin_POIs_fin <- data.table::rbindlist(list(wbin_POIs, - select(wb_inlet_events, COMID, - Type_WBIn = POI_identifier, nexus)), - fill = TRUE) %>% - st_as_sf() - } - - return(list(POIs = wbin_POIs_fin, events = wb_inlet_events)) - } else { - print("no waterbody inlets events") - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") - return(list(POIs = wbin_POIs, events = NA)) - } - } else { - wbin_POIs <- addType(wbin_POIs, tmp_POIs, "WBIn") - return(list(POIs = wbin_POIs, events = NA)) - } - -} - -### used as part of waterbody poi creation in the workflow -#' collapses close upstream gages with wb inlet events. -#' @param temp_POIs (sf data.frame) rolling POI data frame -#' @param nhd (sf data.frame) nhd flowlines -#' @param events (list) waterbody inlet events -#' -#' @return (sf data.frame) dataframe of gage POIs -#' -wb_inlet_collapse <- function(tmp_POIs, nhd, events){ - gage_dist_node <- function(x, wb_ds_ds, gage_add, events){ - print (x) - wb_in_fl <- filter(wb_ds_ds, COMID == x) - gage_ds <- filter(wb_ds_ds, Hydroseq %in% wb_in_fl$Hydroseq | - Hydroseq %in% wb_in_fl$DnHydroseq) %>% - filter(TotDASqKM == max(TotDASqKM)) - - gage_reach <- gage_ds %>% - group_by(REACHCODE) %>% - summarize(do_union = TRUE, - total_length = sum(LENGTHKM)) - - #print(nrow(gage_reach)) - gage_event <- filter(gage_add, COMID %in% gage_ds$COMID) - wb_event <- filter(events, COMID == x, event_type == "inlet") %>% - unique() - - if(nrow(gage_reach) == 0){ - print("no gage reaches") - } - - if(nrow(gage_event) == 0){ - return("No gage events") - } else if(gage_event$COMID != wb_in_fl$COMID) { - gage_reach <- gage_reach %>% - filter(REACHCODE == gage_event$reachcode) %>% - mutate(gage_dist = ifelse(gage_event$nexus == TRUE, - total_length * (1 - (gage_event$reach_meas/100)), - total_length)) %>% - mutate(gage_comid = gage_event$COMID, - wbin_comid = x) - } else if(gage_event$COMID == wb_in_fl$COMID){ - if(nrow(wb_event) >0){ - wb_in_meas <- wb_event$REACH_meas - wb_RC <- wb_event$REACHCODE - } else { - wb_out_meas <- wb_in_fl$FromMeas - wb_RC <- wb_in_fl$REACHCODE - } - - # gage reach - gage_reach <- gage_reach %>% - filter(REACHCODE == gage_event$reachcode) %>% - mutate(gage_dist = total_length * (1 - (gage_event$reach_meas/100))) - - # wb info - wb_reach <- gage_reach %>% - filter(REACHCODE == wb_RC) %>% - mutate(wb_dist = total_length * (1 - (wb_out_meas /100))) - - gage_reach <- gage_reach %>% - mutate(gage_dist = abs(wb_reach$wb_dist - gage_dist), - gage_comid = gage_event$COMID, - wbin_comid = x) - } - } - - #events <- read_sf(temp_gpkg, split_layer) %>% - # rbind(st_compatibalize(wb_,.)) - - # Previously identified streamgages within Gage_Selection.Rmd - streamgages_VPU <- gages %>% - rename(COMID = comid) %>% - filter(COMID %in% nhd$COMID) %>% - #st_drop_geometry() %>% - switchDiv(., nhd) - - # get waterbody outlets - wb_in <- filter(tmp_POIs, !is.na(Type_WBIn), is.na(Type_Gages)) - - wb_in_nexus <- filter(wb_in, nexus == TRUE) - wb_in_nhd <- filter(nhd, COMID %in% wb_in$COMID) - wb_ds_ds <- wb_in_nhd %>% - rbind(filter(nhd, LevelPathI %in% .$LevelPathI & DnHydroseq %in% .$Hydroseq)) %>% - distinct() - - # get gages - gage_wb <- filter(tmp_POIs, !is.na(Type_Gages) & is.na(Type_WBIn)) #%>% - # filter(COMID %in% wb_ds_ds$COMID) - #gage_nhd <- filter(nhd, COMID %in% gage_wb$COMID) - #gage_node <- filter(gage_wb, nexus == FALSE) - #gage_nexus <- filter(gage_wb, nexus == TRUE) - - gage_add <- filter(streamgages_VPU, site_no %in% gage_wb$Type_Gages) %>% - select(COMID, reachcode, reach_meas, site_no) %>% - inner_join(select(st_drop_geometry(gage_wb), site_no = Type_Gages, nexus), - by = "site_no") %>% - filter(!nexus) - - output <- lapply(wb_in$COMID, - gage_dist_node, wb_ds_ds, gage_add, events) - - output_full <- do.call("rbind", output[lengths(output) > 1]) - - if(!is.null(output_full)){ - output_full <- output_full %>% - filter(gage_dist < 1) %>% - left_join(select(st_drop_geometry(nhd), COMID, TotDASqKM_WB = TotDASqKM), - by = c("wbin_comid" = "COMID")) %>% - left_join(select(st_drop_geometry(nhd), COMID, TotDASqKM_gage = TotDASqKM), - by = c("gage_comid" = "COMID")) %>% - mutate(DAR = TotDASqKM_gage / TotDASqKM_WB) %>% - filter(gage_dist < 1, DAR > .975) %>% - st_drop_geometry() - - gage_POI <- filter(tmp_POIs, COMID %in% output_full$gage_comid) %>% - select(COMID, Type_HUC12_ds = Type_HUC12, Type_Gages_ds = Type_Gages, - Type_TE_ds = Type_TE, Type_Term_ds = Type_Term, nexus) %>% - st_drop_geometry() %>% - group_by(COMID) %>% - summarise(Type_HUC12_ds = last(na.omit(Type_HUC12_ds)), - Type_Gages_ds = last(na.omit(Type_Gages_ds)), - Type_Term_ds = last(na.omit(Type_Term_ds)), - Type_TE_ds = last(na.omit(Type_TE_ds)), - nexus = last(na.omit(nexus))) - - WB_POI <- filter(tmp_POIs, COMID %in% output_full$wbin_comid, !is.na(Type_WBIn)) %>% - left_join(select(output_full, gage_comid, wbin_comid), by = c("COMID" = "wbin_comid")) %>% - left_join(select(gage_POI, -nexus), by = c("gage_comid" = "COMID")) %>% - mutate(Type_HUC12 = ifelse(!is.na(Type_HUC12_ds), Type_HUC12_ds, Type_HUC12), - Type_Gages = ifelse(!is.na(Type_Gages_ds), Type_Gages_ds, Type_Gages), - Type_TE = ifelse(!is.na(Type_TE_ds), Type_TE_ds, Type_TE), - Type_Term = ifelse(!is.na(Type_Term_ds), Type_Term_ds, Type_Term)) %>% - select(-c(Type_HUC12_ds, Type_Gages_ds, Type_TE_ds, Type_Term_ds)) - - tmp_POIs_fin <- filter(tmp_POIs, !COMID %in% c(WB_POI$COMID, WB_POI$gage_comid)) %>% - rbind(select(WB_POI, -gage_comid)) - - if(any(gage_POI$nexus == TRUE)){ - gage_events <- filter(gage_POI, nexus == TRUE) - events <- filter(events, !POI_identifier %in% gage_events$Type_Gages_ds) - } - return(list(POIs = tmp_POIs_fin, events_ret = events)) - } else { - print ("no points collapse") - return(list(POIs = tmp_POIs, events_ret = NA)) - } -} - ### used when poi_move.R is sourced in POI collapse phase of workflow ### consider calling at the end of each poi creation process? #' Collapses POIs together based on criteria diff --git a/workspace/_targets/02_POI_creation_targets.R b/workspace/_targets/02_POI_creation_targets.R index 8a1d8f5..52eb325 100644 --- a/workspace/_targets/02_POI_creation_targets.R +++ b/workspace/_targets/02_POI_creation_targets.R @@ -2,7 +2,8 @@ library(targets) tar_option_set(packages = c("dplyr", "sf", "hyfabric", "hydroloom", "nhdplusTools"), memory = "transient", garbage_collection = TRUE, error = "null", - storage = "worker", retrieval = "worker", deployment = "main") + storage = "worker", retrieval = "worker", deployment = "main", + debug = "wb_inlet_pois_0a67251eed4e40f7") library(future) library(future.callr) @@ -22,21 +23,26 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), tar_target(nav_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg"))), tar_target(temp_gpkg, file.path("cache", paste0("nav_", vpu_codes, ".gpkg"))), + ### Base flowline -- gets updated attributes tar_target(flowline, read_sf(nav_gpkg, nhd_flowline), pattern = map(nav_gpkg), deployment = "worker"), + ### huc12 pois tar_target(huc12_poi, create_hu12_pois(read_sf(data_paths$hu12_points_path, "hu_points"), flowline, poi_name = "HUC12", vpu_codes), pattern = map(flowline, vpu_codes)), + ### gage POIs tar_target(gage_info_gpkg_file, gage_info_gpkg, format = "file"), - tar_target(gage_pois, create_gage_pois(read_sf(gage_info_gpkg_file, "Gages"), + tar_target(gages, read_sf(gage_info_gpkg_file, "Gages")), + tar_target(gage_pois, create_gage_pois(gages, flowline, "Gages", vpu_codes, huc12_poi$tmp_POIs, min_da_km_gages, combine_meters, reach_meas_thresh), pattern = map(flowline, huc12_poi, vpu_codes), deployment = "worker"), + ### Thermo electric power plant POIs tar_target(te_points_file, data_paths$TE_points_path, format = "file"), tar_target(te_points, read_sf(te_points_file, "2015_TE_Model_Estimates_lat.long_COMIDs")), tar_target(te_attributes_file, file.path(data_paths$TE_points_path, "1_model_inputs/te_plants.csv"), format = "file"), @@ -46,17 +52,19 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), flowline, "TE", vpu_codes), pattern = map(gage_pois, huc12_poi, flowline, vpu_codes)), + ### get waterbodies in order tar_target(combined_waterbodies_file, file.path(prep_path, "combined_waterbodies"), format = "file"), tar_target(combined_waterbodies, readRDS(combined_waterbodies_file)), tar_target(waterbodies, make_waterbodies_layer(combined_waterbodies, flowline, geo_crs), pattern = map(flowline), deployment = "worker"), - + + ### star fit cross walk and reservoir POIs tar_target(istarf_xwalk_file, data_paths$istarf_xwalk, format = "file"), tar_target(istarf_xwalk, read.csv(istarf_xwalk_file)), tar_target(resops_pois, create_resops_pois(te_pois$tmp_POIs, waterbodies, istarf_xwalk, flowline, "resops"), pattern = map(te_pois, waterbodies, flowline), deployment = "worker"), - + ### hilarri is more reservoir POIs tar_target(hilarri_file, data_paths$hilarri_sites, format = "file"), tar_target(hilarri_data, read.csv(hilarri_file, colClasses = "character")), tar_target(hilarri_pois, create_hilarri_pois(resops_pois$tmp_POIs, @@ -64,7 +72,10 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), resops_pois$wb_pois, flowline, "hilarri"), pattern = map(resops_pois, flowline), deployment = "worker"), + + ### waterbody outlet POIs # TODO: work on cleaning up parameters of this function + # TODO: track event creation explicitly in workflow tar_target(wb_outlet_pois, create_wb_outlet_pois(hilarri_pois$tmp_POIs, filter(combined_waterbodies, layer == "NHDArea"), hilarri_pois$wb_pois, @@ -74,11 +85,42 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), nav_gpkg, data_paths, proj_crs, - read_sf(gage_info_gpkg_file, "Gages"), + gages, "wb_id"), pattern = map(hilarri_pois, resops_pois, gage_pois,flowline, nav_gpkg), deployment = "worker"), - tar_target(write_wb_flowline_mod, write_sf(wb_outlet_pois$updated_flowline, - nav_gpkg, nhd_flowline), - pattern = map(wb_outlet_pois, nav_gpkg)) + + ### update flowlines for clarity + tar_target(updated_flowline_wb_outlet, wb_outlet_pois$updated_flowline, + pattern = map(wb_outlet_pois), deployment = "worker"), + + ### Addition and Removal events from Brakebill, 2019 + tar_target(ar_data, readxl::read_xlsx(file.path(data_paths$USGS_IT_path, + "ARevents_final7.xlsx"))), + tar_target(ar_event_pois, create_ar_event_pois(wb_outlet_pois$tmp_POIs, ar_data, + updated_flowline_wb_outlet, "AR"), + pattern = map(wb_outlet_pois, updated_flowline_wb_outlet), deployment = "worker"), + + ### terminal POIs at network ends + tar_target(terminal_pois, create_terminal_pois(ar_event_pois$tmp_POIs, + updated_flowline_wb_outlet, + min_da_km_terminal, "Term"), + pattern = map(ar_event_pois, updated_flowline_wb_outlet), deployment = "worker"), + + ### confluence POIs + tar_target(confluence_pois, create_confluence_pois(terminal_pois$tmp_POIs, + updated_flowline_wb_outlet, "Conf"), + pattern = map(terminal_pois, updated_flowline_wb_outlet), deployment = "worker"), + + ### more updated flowline attributes + tar_target(updated_flowline_confluence, confluence_pois$updated_flowline, + pattern = map(confluence_pois), deployment = "worker"), + + ## waterbody inlet POIs + tar_target(wb_inlet_pois, create_wb_inlet_pois(confluence_pois$tmp_POIs, + wb_outlet_pois$WB_VPU_pois, + wb_outlet_pois$all_events, + updated_flowline_confluence, gages, + data_paths, proj_crs, "WBIn"), + pattern = map(confluence_pois, wb_outlet_pois, updated_flowline_confluence), deployment = "worker") ) \ No newline at end of file diff --git a/workspace/workflow_runner.R b/workspace/workflow_runner.R index b889d3e..7716fa5 100644 --- a/workspace/workflow_runner.R +++ b/workspace/workflow_runner.R @@ -48,5 +48,14 @@ tar_make() Sys.setenv(TAR_PROJECT = "02_POI_creation") +tar_make_future(huc12_poi, workers = 8) +tar_make_future(gage_pois, workers = 8) +tar_make_future(te_pois, workers = 8) +tar_make_future(resops_pois, workers = 8) tar_make_future(hilarri_pois, workers = 8) - \ No newline at end of file +tar_make_future(wb_outlet_pois, workers = 8) +tar_make_future(write_wb_flowline_mod, workers = 8) +tar_make_future(ar_event_pois, workers = 8) +tar_make_future(terminal_pois, workers = 8) +tar_make_future(updated_flowline_confluence, workers = 8) +tar_make_future(wb_inlet_pois, workers = 8) -- GitLab