From 180309e97bc6bf71679e0081be338fcb792736c2 Mon Sep 17 00:00:00 2001 From: David Blodgett <dblodgett@usgs.gov> Date: Wed, 18 Dec 2024 21:11:26 -0600 Subject: [PATCH] finish first pass of non_nedritic workflow for #153 --- workspace/R/04_aggregate_functions.R | 309 +------ ...on_dend.R => 05_non_dendritic_functions.R} | 860 ++++++++++++++++-- workspace/_targets.yaml | 3 + workspace/_targets/04_aggregate_targets.R | 49 +- workspace/_targets/05_non_dendritic_targets.R | 115 +++ workspace/_targets_runner.R | 6 + 6 files changed, 908 insertions(+), 434 deletions(-) rename workspace/R/{non_dend.R => 05_non_dendritic_functions.R} (53%) create mode 100644 workspace/_targets/05_non_dendritic_targets.R diff --git a/workspace/R/04_aggregate_functions.R b/workspace/R/04_aggregate_functions.R index ad266bd..0d32e6d 100644 --- a/workspace/R/04_aggregate_functions.R +++ b/workspace/R/04_aggregate_functions.R @@ -68,7 +68,7 @@ aggregate_catchments_fun <- function(divides, reconciled, outlets_POI, outlets <- filter(outlets, ID %in% reconciled$ID) |> distinct() - agg_cats <- aggregate_catchments(flowpath = reconciled, + agg_cats <- hyRefactor::aggregate_catchments(flowpath = reconciled, divide = divides, outlets = outlets, da_thresh = aggregate_da_thresh_sqkm, @@ -182,310 +182,3 @@ make_lookup <- function(reconciled, agg_cats, agg_fline) { dplyr::left_join(aggregate_lookup_catchment, by = "reconciled_ID") } - -#'merge rpu to vpu -#' @param pois pois from reference fabric step -#' @param rpu_vpu table of rpu and vpu codes -#' @param vpu vpu to be merged -#' @param rpu_vpu_out outputs from rpus to other rpus/vpus -#' @param ref_gpkg_fun function to convert a rpu into a refactored gpkg path -#' @param agg_gpkg_fun function to convert a rpu into an aggregate gpkg path -#' @param lookup_table_refactor table name to get the lookup table from -#' @param reconciled_layer table name to get the reconciled flowlines from -#' @param divide_layer table name to get the reconciled divides from -#' @param split_divide_layer table name to get the split catchment divides from -#' @param agg_fline_layer table name to get the aggregate flowlines from -#' @param agg_cats_layer table name to get the aggregate catchments from -#' @param mapped_oulets_layer table name to get the long form mapped pois from -#' -merge_rpu_to_vpu <- function(pois, rpu_vpu, vpu, rpu_vpu_out, - ref_gpkg_fun, agg_gpkg_fun, - lookup_table_refactor, - reconciled_layer, divide_layer, split_divide_layer, - agg_fline_layer, agg_cats_layer, mapped_outlets_layer) { - - rpu_vpu <- rpu_vpu[rpu_vpu$vpuid == vpu,] - - # Thematic POIs - POIs <- pois %>% - select(-geom_id) - - merged_layers <- merge_refactor(rpu_vpu$rpuid, - rpu_vpu_out, - ref_gpkg_fun, - agg_gpkg_fun, - lookup_table_refactor, - reconciled_layer, - divide_layer, - split_divide_layer, - agg_fline_layer, - agg_cats_layer, - mapped_outlets_layer) - - merged_layers[[agg_cats_layer]] <- merged_layers[[agg_cats_layer]] %>% - mutate(areasqkm = as.numeric(units::set_units(st_area(.), "km^2"))) - - merged_layers[[agg_fline_layer]] <- merged_layers[[agg_fline_layer]] %>% - mutate(lengthkm = as.numeric(units::set_units(st_length(.), "km"))) %>% - left_join(select(st_drop_geometry(merged_layers[[agg_cats_layer]]), ID, areasqkm), by = "ID") - - cat_net_rfc <- st_drop_geometry(merged_layers[[reconciled_layer]]) %>% - select(id = ID, toID = toID, lengthkm = LENGTHKM, levelpathid = LevelPathID) %>% - left_join(select(st_drop_geometry(merged_layers[[divide_layer]]), - id = ID, areasqkm), by = "id") %>% - relocate(id, toID, lengthkm, areasqkm, levelpathid) - - cat_net_gf <- st_drop_geometry(merged_layers[[agg_fline_layer]]) %>% - select(id = ID, toid = toID, lengthkm, areasqkm) %>% - left_join(distinct(select(merged_layers[[lookup_table_refactor]], - id = aggregated_flowpath_ID, - levelpathid = LevelPathID))) - - list(pois = POIs, - lookup_table_refactor = select(merged_layers[[lookup_table_refactor]], - -aggregated_flowpath_ID, - -aggregated_divide_ID), - reconciled_layer = merged_layers[[reconciled_layer]], - divide_layer = merged_layers[[divide_layer]], - split_divide_layer = merged_layers[[split_divide_layer]], - mapped_outlets_layer = merged_layers[[mapped_outlets_layer]], - agg_cats_layer = merged_layers[[agg_cats_layer]], - agg_fline_layer = merged_layers[[agg_fline_layer]], - lookup_table_refactor = merged_layers[[lookup_table_refactor]], - reconciled_layer = merged_layers[[reconciled_layer]], - catchment_network_table_refector = cat_net_rfc, - cathment_network_table_aggregate = cat_net_gf, - lookup_table_aggregate = merged_layers[[lookup_table_refactor]]) -} - -#' merge refactor -#' @param rpus vector of rpu codes to be merged -#' @param rpu_vpu_out outputs from rpus to other rpus/vpus -#' @param ref_gpkg_fun function to convert a rpu into a refactored gpkg path -#' @param agg_gpkg_fun function to convert a rpu into an aggregate gpkg path -#' @param lookup_table_refactor table name to get the lookup table from -#' @param reconciled_layer table name to get the reconciled flowlines from -#' @param divide_layer table name to get the reconciled divides from -#' @param split_divide_layer table name to get the split catchment divides from -#' @param agg_fline_layer table name to get the aggregate flowlines from -#' @param agg_cats_layer table name to get the aggregate catchments from -#' @param mapped_oulets_layer table name to get the long form mapped pois from -#' -merge_refactor <- function(rpus, - rpu_vpu_out, - ref_gpkg_fun, - agg_gpkg_fun, - lookup_table_refactor, - reconciled_layer, - divide_layer, - split_divide_layer, - agg_fline_layer, - agg_cats_layer, - mapped_outlets_layer) { - - - out <- rep(list(list()), length(rpus)) - names(out) <- rpus - - s <- 10000000 - - for(rpu in rpus) { - print(rpu) - - refactor_gpkg <- ref_gpkg_fun(rpu) - aggregate_gpkg <- agg_gpkg_fun(rpu) - - out[[rpu]] <- setNames(list(read_sf(aggregate_gpkg, lookup_table_refactor), - read_sf(refactor_gpkg, reconciled_layer), - read_sf(refactor_gpkg, divide_layer), - read_sf(refactor_gpkg, split_divide_layer), - read_sf(aggregate_gpkg, agg_fline_layer), - read_sf(aggregate_gpkg, agg_cats_layer), - read_sf(aggregate_gpkg, mapped_outlets_layer)), - c(lookup_table_refactor, - reconciled_layer, - divide_layer, - split_divide_layer, - agg_fline_layer, - agg_cats_layer, - mapped_outlets_layer))#, - - ### RECONCILED ### - - out[[rpu]][[reconciled_layer]] <- select(out[[rpu]][[reconciled_layer]], - ID, toID, LENGTHKM, TotDASqKM, member_COMID, - LevelPathID = orig_levelpathID) - - out[[rpu]][[reconciled_layer]]$newID <- - seq(s, by = 1, length.out = nrow(out[[rpu]][[reconciled_layer]])) - - s <- max(out[[rpu]][[reconciled_layer]]$newID) + 1 - - # get a toID that matches the new IDs - out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], - select(st_drop_geometry(out[[rpu]][[reconciled_layer]]), - ID, newtoID = newID), - by = c("toID" = "ID")) - - # find updates that we need to apply - update <- rpu_vpu_out[!rpu_vpu_out$toCOMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID & - rpu_vpu_out$COMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID, ] - - # apply these updates for follow up if we got any - if(nrow(update) > 0) { - update <- left_join(update, out[[rpu]][[lookup_table_refactor]], - by = c("COMID" = "NHDPlusV2_COMID")) %>% - left_join(select(out[[rpu]][[lookup_table_refactor]], - NHDPlusV2_COMID, to_member_COMID = member_COMID), - by = c("toCOMID" = "NHDPlusV2_COMID")) - - out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], - select(update, reconciled_ID, toCOMID), - by = c("ID" = "reconciled_ID")) - } - - #### LOOKUP TABLE #### - - update_id <- st_drop_geometry(select(out[[rpu]][[reconciled_layer]], - old_ID = ID, ID = newID)) - - out[[rpu]][[lookup_table_refactor]] <- out[[rpu]][[lookup_table_refactor]] %>% - left_join(update_id, by = c("reconciled_ID" = "old_ID")) %>% - select(-reconciled_ID, reconciled_ID = ID) %>% - left_join(update_id, by = c("aggregated_flowpath_ID" = "old_ID")) %>% - select(-aggregated_flowpath_ID, aggregated_flowpath_ID = ID) %>% - left_join(update_id, by = c("aggregated_divide_ID" = "old_ID")) %>% - select(-aggregated_divide_ID, aggregated_divide_ID = ID) %>% - left_join(st_drop_geometry(select(out[[rpu]][[reconciled_layer]], - ID = newID, LevelPathID)), - by = c("reconciled_ID" = "ID")) - - #### DIVIDE LAYER #### - - out[[rpu]][[divide_layer]] <- left_join(rename(out[[rpu]][[divide_layer]], - old_ID = ID), - update_id, by = c("old_ID")) - - out[[rpu]][[divide_layer]] <- select(out[[rpu]][[divide_layer]], - ID, areasqkm, member_COMID) - - out[[rpu]][[divide_layer]]$rpu <- rep(rpu, nrow(out[[rpu]][[divide_layer]])) - - #### aggregate layers #### - - for(l in c(agg_cats_layer, agg_fline_layer)) { #, mapped_outlets_layer - - out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_ID = ID), - update_id, - by = c("old_ID")) - - if("toID" %in% names(out[[rpu]][[l]])) { - out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_toID = toID), - rename(update_id, toID = ID), - by = c("old_toID" = "old_ID")) - - sets <- select(st_drop_geometry(out[[rpu]][[l]]), ID, old_ID, old_set = set) %>% - mutate(old_set = strsplit(old_set, ",")) %>% - tidyr::unnest(cols = old_set) %>% - mutate(old_set = as.integer(old_set)) %>% - left_join(rename(update_id, set = ID), by = c("old_set" = "old_ID")) %>% - select(-old_set, -old_ID) - - sets <- split(sets$set, sets$ID) - - sets <- data.frame(ID = as.integer(names(sets))) %>% - mutate(set = unname(sets)) - - out[[rpu]][[l]] <- left_join(select(out[[rpu]][[l]], -set), - sets, by = "ID") - - out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID, -old_toID) - - out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID", "toID", "set", - names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in% - c("ID", "toID", "set", - attr(out[[rpu]][[l]], "sf_column"))], - attr(out[[rpu]][[l]], "sf_column"))] - - out[[rpu]][[l]]$set <- sapply(out[[rpu]][[l]]$set, paste, collapse = ",") - } else { - - out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID) - - out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID", - names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in% - c("ID", - attr(out[[rpu]][[l]], "sf_column"))], - attr(out[[rpu]][[l]], "sf_column"))] - } - } - - } - - out <- setNames(lapply(names(out[[1]]), function(x, out) { - dplyr::bind_rows(lapply(out, function(df, n) { - if("COMID" %in% names(df[[n]]) && is.numeric(df[[n]]$COMID)) - df[[n]]$COMID <- as.character(df[[n]]$COMID) - df[[n]] - - if("event_id" %in% names(df[[n]]) && !is.character(df[[n]]$event_id)) - df[[n]]$event_id <- as.character(df[[n]]$event_id) - df[[n]] - }, n = x)) - }, out = out), names(out[[1]])) - - if("toCOMID" %in% names(out[[reconciled_layer]])) { - - # blow up so we have unique COMIDs to join on. - # need to keep the top most of any splits (the .1 variety) - # this makes sure out toCOMID assignments go to the right new id. - long_form <- st_drop_geometry(out[[reconciled_layer]]) %>% - select(newID, member_COMID) %>% - mutate(member_COMID = strsplit(member_COMID, ",")) %>% - tidyr::unnest(cols = member_COMID) %>% - filter(grepl("\\.1$", member_COMID) | !grepl("\\.", member_COMID)) %>% - mutate(NHDPlusV2_COMID = as.integer(member_COMID)) %>% - select(-member_COMID, update_newtoID = newID) - - # NOTE: if the missing tocomid is in the next VPU they will still be NA - out[[reconciled_layer]] <- left_join(out[[reconciled_layer]], - long_form, by = c("toCOMID" = "NHDPlusV2_COMID")) - out[[reconciled_layer]]$newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] <- - out[[reconciled_layer]]$update_newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] - - # now use updates from refactored ids with aggregate - long_form <- st_drop_geometry(out[[agg_fline_layer]]) %>% - select(ID, set) %>% - mutate(set = strsplit(set, ",")) %>% - tidyr::unnest(cols = set) %>% - mutate(set = as.integer(set)) %>% - select(update_newtoID = ID, refactor_ID = set) - - # join updated toID from refactor so we can update as needed - out[[agg_fline_layer]] <- out[[agg_fline_layer]] %>% - # this adds a "new_toID" containing the refactor id that the - # aggregate flowpaths we are updating need to go to. - left_join(select(st_drop_geometry(out[[reconciled_layer]]), - ID = newID, new_toID = update_newtoID), - by = "ID") %>% - # we need to handle sets of refactor ids and get the right aggregate toID - left_join(long_form, by = c("new_toID" = "refactor_ID")) - # need to update the field to reflect the information joined just above. - out[[agg_fline_layer]]$new_toID[!is.na(out[[agg_fline_layer]]$update_newtoID)] <- - out[[agg_fline_layer]]$update_newtoID[!is.na(out[[agg_fline_layer]]$update_newtoID)] - - # now do the actual toID updates - out[[agg_fline_layer]]$toID[!is.na(out[[agg_fline_layer]]$new_toID)] <- - out[[agg_fline_layer]]$new_toID[!is.na(out[[agg_fline_layer]]$new_toID)] - - out[[agg_fline_layer]] <- select(out[[agg_fline_layer]], -new_toID, -update_newtoID) - } - - out[[split_divide_layer]] <- rename(out[[split_divide_layer]], comid_part = FEATUREID) - - out[[reconciled_layer]] <- select(out[[reconciled_layer]], ID = newID, - toID = newtoID, LENGTHKM, TotDASqKM, - member_COMID, LevelPathID, refactor_ID = ID) - - out -} diff --git a/workspace/R/non_dend.R b/workspace/R/05_non_dendritic_functions.R similarity index 53% rename from workspace/R/non_dend.R rename to workspace/R/05_non_dendritic_functions.R index 090819d..fce667c 100644 --- a/workspace/R/non_dend.R +++ b/workspace/R/05_non_dendritic_functions.R @@ -1,3 +1,516 @@ +#'merge rpu to vpu +#' @param pois pois from reference fabric step +#' @param rpu_vpu table of rpu and vpu codes +#' @param vpu vpu to be merged +#' @param rpu_vpu_out outputs from rpus to other rpus/vpus +#' @param ref_gpkg_fun function to convert a rpu into a refactored gpkg path +#' @param agg_gpkg_fun function to convert a rpu into an aggregate gpkg path +#' @param lookup_table_refactor table name to get the lookup table from +#' @param reconciled_layer table name to get the reconciled flowlines from +#' @param divide_layer table name to get the reconciled divides from +#' @param split_divide_layer table name to get the split catchment divides from +#' @param agg_fline_layer table name to get the aggregate flowlines from +#' @param agg_cats_layer table name to get the aggregate catchments from +#' @param mapped_oulets_layer table name to get the long form mapped pois from +#' +merge_rpu_to_vpu <- function(pois, rpu_vpu, vpu, rpu_vpu_out, + ref_gpkg_fun, agg_gpkg_fun, + lookup_table_refactor, + reconciled_layer, divide_layer, split_divide_layer, + agg_fline_layer, agg_cats_layer, mapped_outlets_layer) { + + rpu_vpu <- rpu_vpu[rpu_vpu$vpuid == vpu,] + + # Thematic POIs + POIs <- pois %>% + select(-geom_id) + + merged_layers <- merge_refactor(rpu_vpu$rpuid, + rpu_vpu_out, + ref_gpkg_fun, + agg_gpkg_fun, + lookup_table_refactor, + reconciled_layer, + divide_layer, + split_divide_layer, + agg_fline_layer, + agg_cats_layer, + mapped_outlets_layer) + + merged_layers[[agg_cats_layer]] <- merged_layers[[agg_cats_layer]] %>% + mutate(areasqkm = as.numeric(units::set_units(st_area(.), "km^2"))) + + merged_layers[[agg_fline_layer]] <- merged_layers[[agg_fline_layer]] %>% + mutate(lengthkm = as.numeric(units::set_units(st_length(.), "km"))) %>% + left_join(select(st_drop_geometry(merged_layers[[agg_cats_layer]]), ID, areasqkm), by = "ID") + + cat_net_rfc <- st_drop_geometry(merged_layers[[reconciled_layer]]) %>% + select(id = ID, toID = toID, lengthkm = LENGTHKM, levelpathid = LevelPathID) %>% + left_join(select(st_drop_geometry(merged_layers[[divide_layer]]), + id = ID, areasqkm), by = "id") %>% + relocate(id, toID, lengthkm, areasqkm, levelpathid) + + cat_net_gf <- st_drop_geometry(merged_layers[[agg_fline_layer]]) %>% + select(id = ID, toid = toID, lengthkm, areasqkm) %>% + left_join(distinct(select(merged_layers[[lookup_table_refactor]], + id = aggregated_flowpath_ID, + levelpathid = LevelPathID))) + + list(pois = POIs, + lookup_table_refactor = select(merged_layers[[lookup_table_refactor]], + -aggregated_flowpath_ID, + -aggregated_divide_ID), + reconciled_layer = merged_layers[[reconciled_layer]], + divide_layer = merged_layers[[divide_layer]], + split_divide_layer = merged_layers[[split_divide_layer]], + mapped_outlets_layer = merged_layers[[mapped_outlets_layer]], + agg_cats_layer = merged_layers[[agg_cats_layer]], + agg_fline_layer = merged_layers[[agg_fline_layer]], + lookup_table_refactor = merged_layers[[lookup_table_refactor]], + reconciled_layer = merged_layers[[reconciled_layer]], + catchment_network_table_refector = cat_net_rfc, + cathment_network_table_aggregate = cat_net_gf, + lookup_table_aggregate = merged_layers[[lookup_table_refactor]]) +} + +#' merge refactor +#' @param rpus vector of rpu codes to be merged +#' @param rpu_vpu_out outputs from rpus to other rpus/vpus +#' @param ref_gpkg_fun function to convert a rpu into a refactored gpkg path +#' @param agg_gpkg_fun function to convert a rpu into an aggregate gpkg path +#' @param lookup_table_refactor table name to get the lookup table from +#' @param reconciled_layer table name to get the reconciled flowlines from +#' @param divide_layer table name to get the reconciled divides from +#' @param split_divide_layer table name to get the split catchment divides from +#' @param agg_fline_layer table name to get the aggregate flowlines from +#' @param agg_cats_layer table name to get the aggregate catchments from +#' @param mapped_oulets_layer table name to get the long form mapped pois from +#' +merge_refactor <- function(rpus, + rpu_vpu_out, + ref_gpkg_fun, + agg_gpkg_fun, + lookup_table_refactor, + reconciled_layer, + divide_layer, + split_divide_layer, + agg_fline_layer, + agg_cats_layer, + mapped_outlets_layer) { + + + out <- rep(list(list()), length(rpus)) + names(out) <- rpus + + s <- 10000000 + + for(rpu in rpus) { + print(rpu) + + refactor_gpkg <- ref_gpkg_fun(rpu) + aggregate_gpkg <- agg_gpkg_fun(rpu) + + out[[rpu]] <- setNames(list(read_sf(aggregate_gpkg, lookup_table_refactor), + read_sf(refactor_gpkg, reconciled_layer), + read_sf(refactor_gpkg, divide_layer), + read_sf(refactor_gpkg, split_divide_layer), + read_sf(aggregate_gpkg, agg_fline_layer), + read_sf(aggregate_gpkg, agg_cats_layer), + read_sf(aggregate_gpkg, mapped_outlets_layer)), + c(lookup_table_refactor, + reconciled_layer, + divide_layer, + split_divide_layer, + agg_fline_layer, + agg_cats_layer, + mapped_outlets_layer))#, + + ### RECONCILED ### + + out[[rpu]][[reconciled_layer]] <- select(out[[rpu]][[reconciled_layer]], + ID, toID, LENGTHKM, TotDASqKM, member_COMID, + LevelPathID = orig_levelpathID) + + out[[rpu]][[reconciled_layer]]$newID <- + seq(s, by = 1, length.out = nrow(out[[rpu]][[reconciled_layer]])) + + s <- max(out[[rpu]][[reconciled_layer]]$newID) + 1 + + # get a toID that matches the new IDs + out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], + select(st_drop_geometry(out[[rpu]][[reconciled_layer]]), + ID, newtoID = newID), + by = c("toID" = "ID")) + + # find updates that we need to apply + update <- rpu_vpu_out[!rpu_vpu_out$toCOMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID & + rpu_vpu_out$COMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID, ] + + # apply these updates for follow up if we got any + if(nrow(update) > 0) { + update <- left_join(update, out[[rpu]][[lookup_table_refactor]], + by = c("COMID" = "NHDPlusV2_COMID")) %>% + left_join(select(out[[rpu]][[lookup_table_refactor]], + NHDPlusV2_COMID, to_member_COMID = member_COMID), + by = c("toCOMID" = "NHDPlusV2_COMID")) + + out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], + select(update, reconciled_ID, toCOMID), + by = c("ID" = "reconciled_ID")) + } + + #### LOOKUP TABLE #### + + update_id <- st_drop_geometry(select(out[[rpu]][[reconciled_layer]], + old_ID = ID, ID = newID)) + + out[[rpu]][[lookup_table_refactor]] <- out[[rpu]][[lookup_table_refactor]] %>% + left_join(update_id, by = c("reconciled_ID" = "old_ID")) %>% + select(-reconciled_ID, reconciled_ID = ID) %>% + left_join(update_id, by = c("aggregated_flowpath_ID" = "old_ID")) %>% + select(-aggregated_flowpath_ID, aggregated_flowpath_ID = ID) %>% + left_join(update_id, by = c("aggregated_divide_ID" = "old_ID")) %>% + select(-aggregated_divide_ID, aggregated_divide_ID = ID) %>% + left_join(st_drop_geometry(select(out[[rpu]][[reconciled_layer]], + ID = newID, LevelPathID)), + by = c("reconciled_ID" = "ID")) + + #### DIVIDE LAYER #### + + out[[rpu]][[divide_layer]] <- left_join(rename(out[[rpu]][[divide_layer]], + old_ID = ID), + update_id, by = c("old_ID")) + + out[[rpu]][[divide_layer]] <- select(out[[rpu]][[divide_layer]], + ID, areasqkm, member_COMID) + + out[[rpu]][[divide_layer]]$rpu <- rep(rpu, nrow(out[[rpu]][[divide_layer]])) + + #### aggregate layers #### + + for(l in c(agg_cats_layer, agg_fline_layer)) { #, mapped_outlets_layer + + out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_ID = ID), + update_id, + by = c("old_ID")) + + if("toID" %in% names(out[[rpu]][[l]])) { + out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_toID = toID), + rename(update_id, toID = ID), + by = c("old_toID" = "old_ID")) + + sets <- select(st_drop_geometry(out[[rpu]][[l]]), ID, old_ID, old_set = set) %>% + mutate(old_set = strsplit(old_set, ",")) %>% + tidyr::unnest(cols = old_set) %>% + mutate(old_set = as.integer(old_set)) %>% + left_join(rename(update_id, set = ID), by = c("old_set" = "old_ID")) %>% + select(-old_set, -old_ID) + + sets <- split(sets$set, sets$ID) + + sets <- data.frame(ID = as.integer(names(sets))) %>% + mutate(set = unname(sets)) + + out[[rpu]][[l]] <- left_join(select(out[[rpu]][[l]], -set), + sets, by = "ID") + + out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID, -old_toID) + + out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID", "toID", "set", + names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in% + c("ID", "toID", "set", + attr(out[[rpu]][[l]], "sf_column"))], + attr(out[[rpu]][[l]], "sf_column"))] + + out[[rpu]][[l]]$set <- sapply(out[[rpu]][[l]]$set, paste, collapse = ",") + } else { + + out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID) + + out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID", + names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in% + c("ID", + attr(out[[rpu]][[l]], "sf_column"))], + attr(out[[rpu]][[l]], "sf_column"))] + } + } + + } + + out <- setNames(lapply(names(out[[1]]), function(x, out) { + dplyr::bind_rows(lapply(out, function(df, n) { + if("COMID" %in% names(df[[n]]) && is.numeric(df[[n]]$COMID)) + df[[n]]$COMID <- as.character(df[[n]]$COMID) + df[[n]] + + if("event_id" %in% names(df[[n]]) && !is.character(df[[n]]$event_id)) + df[[n]]$event_id <- as.character(df[[n]]$event_id) + df[[n]] + }, n = x)) + }, out = out), names(out[[1]])) + + if("toCOMID" %in% names(out[[reconciled_layer]])) { + + # blow up so we have unique COMIDs to join on. + # need to keep the top most of any splits (the .1 variety) + # this makes sure out toCOMID assignments go to the right new id. + long_form <- st_drop_geometry(out[[reconciled_layer]]) %>% + select(newID, member_COMID) %>% + mutate(member_COMID = strsplit(member_COMID, ",")) %>% + tidyr::unnest(cols = member_COMID) %>% + filter(grepl("\\.1$", member_COMID) | !grepl("\\.", member_COMID)) %>% + mutate(NHDPlusV2_COMID = as.integer(member_COMID)) %>% + select(-member_COMID, update_newtoID = newID) + + # NOTE: if the missing tocomid is in the next VPU they will still be NA + out[[reconciled_layer]] <- left_join(out[[reconciled_layer]], + long_form, by = c("toCOMID" = "NHDPlusV2_COMID")) + out[[reconciled_layer]]$newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] <- + out[[reconciled_layer]]$update_newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] + + # now use updates from refactored ids with aggregate + long_form <- st_drop_geometry(out[[agg_fline_layer]]) %>% + select(ID, set) %>% + mutate(set = strsplit(set, ",")) %>% + tidyr::unnest(cols = set) %>% + mutate(set = as.integer(set)) %>% + select(update_newtoID = ID, refactor_ID = set) + + # join updated toID from refactor so we can update as needed + out[[agg_fline_layer]] <- out[[agg_fline_layer]] %>% + # this adds a "new_toID" containing the refactor id that the + # aggregate flowpaths we are updating need to go to. + left_join(select(st_drop_geometry(out[[reconciled_layer]]), + ID = newID, new_toID = update_newtoID), + by = "ID") %>% + # we need to handle sets of refactor ids and get the right aggregate toID + left_join(long_form, by = c("new_toID" = "refactor_ID")) + # need to update the field to reflect the information joined just above. + out[[agg_fline_layer]]$new_toID[!is.na(out[[agg_fline_layer]]$update_newtoID)] <- + out[[agg_fline_layer]]$update_newtoID[!is.na(out[[agg_fline_layer]]$update_newtoID)] + + # now do the actual toID updates + out[[agg_fline_layer]]$toID[!is.na(out[[agg_fline_layer]]$new_toID)] <- + out[[agg_fline_layer]]$new_toID[!is.na(out[[agg_fline_layer]]$new_toID)] + + out[[agg_fline_layer]] <- select(out[[agg_fline_layer]], -new_toID, -update_newtoID) + } + + out[[split_divide_layer]] <- rename(out[[split_divide_layer]], comid_part = FEATUREID) + + out[[reconciled_layer]] <- select(out[[reconciled_layer]], ID = newID, + toID = newtoID, LENGTHKM, TotDASqKM, + member_COMID, LevelPathID, refactor_ID = ID) + + out +} + +#' get vpu configuration +#' @param vpu vpu to get configuration for +#' @param data_paths list of paths to data resources +#' +get_vpu_config <- function(vpu, data_paths, full_nhd, full_wbd) { + + elev <- data_paths$elev_cm[grepl(paste0("Ned", substr(vpu, 1, 2)), + data_paths$elev_cm, ignore.case = TRUE)] + + if (vpu == "08"){ + elev$rpu_03g <- data_paths$elev_cm$rpu_03g + } + + if(vpu == "02"){ + grep_exp <-"^02|^04" + } else if (vpu == "08") { + grep_exp <- "^03|^08" + } else { + grep_exp <- paste0("^", substr(vpu, start = 1, stop = 2)) + elev <- append(elev, list(rpu_03g = data_paths$elev_cm$rpu_03g)) + } + + vpu_nhd <- full_nhd %>% + filter(grepl(paste0("^", grep_exp, ".*"), .data$VPUID)) %>% + nhdplusTools::align_nhdplus_names(.) + + vpu_WBD <- full_wbd %>% + filter(grepl(paste0("^", grep_exp, ".*"), .data$HUC_12)) + + list(vpu_nhd = vpu_nhd, vpu_WBD = vpu_WBD, elev = elev) +} + +#' make hu12 xwalk +#' @param cats catchments from reference fabric +#' @param divides reconciled divides from refactor +#' @param vpu vpu that we are running +#' @param wbd_xwalk crosswalk input file +#' @param wbd_sf raw wbd dataset +make_hu12_xwalk <- function(cats, divides, vpu, wbd_xwalk, wbd_sf) { + + # Read in full NHD cats + cats <- filter(cats, full_cats == 1) + cats <- sf::st_make_valid(cats) + divides <- sf::st_make_valid(divides) + + # Intersect NHD catchments and divides with HUC12 + nhd_wbd_int <- get_HUC12_Xwalk(vpu, cats, divides, + wbd_xwalk, + wbd_sf) + + # Bring over divides/HUC12 intersection information into divides layer + xwalk_divides_wbd <- st_drop_geometry(nhd_wbd_int$divides_HUC12) %>% + select(-c(ACRES, HU_12_MOD)) + + divides <- divides %>% + left_join(distinct(xwalk_divides_wbd) %>% + group_by(ID) %>% + filter(intArea == max(intArea)) %>% + ungroup() %>% + select(ID, HUC_12_int, intArea, AreaHUC12), + by = "ID") + + # Bring over divides/HUC12 intersection information into divides layer + xwalk_nhd_wbd <- st_drop_geometry(nhd_wbd_int$cats_HUC12) %>% + select(-c(ACRES, HU_12_MOD)) + + rm(nhd_wbd_int) + + cats <- cats %>% + left_join(distinct(xwalk_nhd_wbd) %>% + group_by(FEATUREID) %>% + slice(which.max(intArea)) %>% + ungroup() %>% + select(FEATUREID, HUC_12_int, intArea, AreaHUC12), + by = "FEATUREID") %>% + st_transform(proj_crs) + + # # All intersecting NHD cat / divides / HUC12 geometries (1: many) + # write_sf(xwalk_nhd_wbd, ND_gpkg, xwalk_layer) + # write_sf(xwalk_divides_wbd, ND_gpkg, divides_xwalk) + # + # # Cats with the HUC_12 value (HUC_12_int) that is the max overlap + # write_sf(cats, ND_gpkg, nhd_catchment) + # write_sf(divides, ND_gpkg, divide_layer) + + list(xwalk_layer = xwalk_nhd_wbd, divides_xwalk = xwalk_divides_wbd, + nhd_catchment = cats, divide_layer = divides) + +} + +#' Merges geopackages together to create CONUs geopackage of features +#' @param VPU VPU from NHDPlusV2 +#' @param full_cats (sf data.frame) all catchments (sinks/flowline) within given VPU +#' @param divides (sf data.frame) divides layer for a given VPU +#' @param wbd (data.frame) HUC12 X-walk table, see get_data for full citation +#' @param wbd_SF (sf data.frame) HUC12 layer from NHDPlusv2.1 national geodatabase +#' @return (sf data.frame) intersection of nhd catchments and divides layer with HUC12 +#' +get_HUC12_Xwalk <- function(vpu, full_cats = FALSE, divides = FALSE, wbd = FALSE, wbd_SF = FALSE){ + + # HUC02 includes some + if(vpu == "02"){ + grep_exp <-"^02|^04" + } else if (vpu == "08") { + grep_exp <- "^03|^08" + } else { + grep_exp <- paste0("^", substr(vpu, start = 1, stop = 2)) + } + + # crosswalk to HU12, filter by VPU + nhd_to_HU12 <- read.csv(wbd, colClasses = c("character", "integer", "character")) %>% + filter(grepl(grep_exp, .data$HUC_12)) %>% + mutate(HUC_8 = substr(HUC_12, 1, 8)) + + # Read in the NHDPlus National GDB HUC12 layer + HUC12_lyr <- wbd_SF %>% + #filter(grepl(paste0("^", VPU,".*"), .data$HUC_12)) %>% + filter(grepl(grep_exp, .data$HUC_12)) %>% #, + st_transform(st_crs(full_cats)) %>% + st_make_valid() %>% + select(HUC_12, ACRES, HU_10_TYPE, HU_12_DS, HU_12_MOD, HU_12_TYPE) + + reg_cats <- full_cats %>% + left_join(select(nhd_to_HU12, c(FEATUREID, HUC_12)), by = "FEATUREID") + + sqm_per_sqkm <- 0.000001 + acres_per_sqkm <- .00404686 + # Intersect the HUC12s and catchments + cats_HUC12 <- sf::st_intersection(reg_cats, HUC12_lyr) %>% + # Convert area to squaker kilometers + mutate(intArea = as.numeric(st_area(.)) * sqm_per_sqkm, + AreaHUC12 = ACRES * acres_per_sqkm) %>% + rename(HUC_12_int = HUC_12.1) + + divides_HUC12 <- sf::st_intersection(divides, HUC12_lyr) %>% + # Convert area to squaker kilometers + mutate(intArea = as.numeric(st_area(.)) * sqm_per_sqkm, + AreaHUC12 = ACRES * acres_per_sqkm) %>% + rename(HUC_12_int = HUC_12) + + + return(list(cats_HUC12 = cats_HUC12, divides_HUC12 = divides_HUC12)) +} + +#' make divides nd +#' @param divides divides after hu12 crosswalk +#' @param lookup lookup_table_aggregate from merged vpus +#' @param cats nhd_catchment from hu12 crosswalk step +#' @param cat_rpu_table table listing what rpu each catchment belongs to +#' +make_divides_nd <- function(divides, lookup, cats, cat_rpu_table, debug = FALSE) { + + # Bind divide with lookup, identify aggregation step + divides_lu <- divides %>% + left_join(distinct(select(lookup, reconciled_ID, aggregated_divide_ID)), + by = c("ID" = "reconciled_ID")) %>% + filter(!is.na(ID) & !is.na(aggregated_divide_ID)) %>% + rename(POI_ID = aggregated_divide_ID) %>% + # attribute that hyrefactor was the step used to aggregate these catchments + mutate(aggStep = "hyRef") + + # Identify cats witin the full catchment set not refactored or aggregated, add + # to divides data frame + cats_miss <- cats %>% + left_join(select(lookup, NHDPlusV2_COMID, member_COMID, reconciled_ID , + POI_ID = aggregated_divide_ID), + by = c("FEATUREID" = "NHDPlusV2_COMID")) %>% + filter(is.na(POI_ID), !member_COMID %in% divides_lu$member_COMID) %>% + mutate(aggStep = NA) %>% + distinct() %>% + left_join(select(cat_rpu_table, FEATUREID, rpu = RPUID)) %>% + #filter(!reconciled_ID %in% divides_lu$ID) %>% + select(-member_COMID) %>% + mutate(areasqkm = as.numeric(st_area(.)/1000000)) %>% + select(ID = reconciled_ID, member_COMID = FEATUREID, rpu, HUC_12_int, intArea, AreaHUC12, POI_ID, aggStep) + + divides_lu <- divides_lu %>% + select(-areasqkm) %>% + rbind(cats_miss) %>% + # resolve terminal non-POI outlets where we can + nhdplusTools:::check_valid() %>% + st_cast("POLYGON") + + # clean_geometry(divides_lu, ID = "POI_ID", keep = 1.0, crs = st_crs(divides_lu)) + divides_lu <- dissolve_holes(divides_lu) + + rm(cats_miss) + + # DEBUG: + # HRU layer + protoHRUs <- NULL + if(debug) { + protoHRUs <- divides_lu %>% + group_by("POI_ID") %>% + summarize(do_union = TRUE) %>% + sfheaders::sf_remove_holes(.) %>% + st_make_valid() + # write_sf(protoHRUs, ND_gpkg, HRU_layer) + # rm(protoHRUs) + } + + # write_sf(divides_lu, ND_gpkg, divides_nd) + + list(divides_nd = divides_lu, HRU_layer = protoHRUs) +} + #' Dissolves internal unaggregated divides within existing aggregated catchments #' @param divides_poi (sf data frame) data frame of reconciled divides # @@ -47,6 +560,49 @@ dissolve_holes <- function(divides_poi){ return(divides_poi) } +#' assign huc12 function +#' @param divides_lu divides from divides non dritic step +#' @param xwalk_nhd_wbd xwalk_layer from huc12 crosswalk step +#' @param vpu_nhd vpu_nhd from vpu_attributes step +#' @param debug logical whether to output proto HRUs +assign_huc12_fun <- function(divides_lu, xwalk_nhd_wbd, vpu_nhd, debug = FALSE) { + + print(paste0("Currently there are ", sum(is.na(divides_lu$POI_ID)), " cats without a POI_ID")) + + # This will crate aggregated IDs for NHD catchments that match HUC12 footprint if aggregated together + # The Coefficient of Areal correpsondence - is the last argument + if("AREASQKM" %in% names(xwalk_nhd_wbd)) { + xwalk_nhd_wbd <- rename(xwalk_nhd_wbd, + AreaSqKM = AREASQKM) + } + + divides_lu <- assign_HUC12(divides_lu, xwalk_nhd_wbd, + filter(vpu_nhd, FTYPE != "Coastline"), CAC_thresh) + + # Determine if any unaggregated catchments match HUC10 footprint (mostly in the west) + divides_lu <- assign_HUC10(divides_lu, xwalk_nhd_wbd, + filter(vpu_nhd, FTYPE != "Coastline"), CAC_thresh) %>% + select(-comid_char) + + protoHRUs <- NULL + if(debug) { + # DEBUG: + # HRU layer + # this is not working in my testing + protoHRUs <- divides_lu %>% + group_by(POI_ID) %>% + summarize(do_union = TRUE) %>% + sfheaders::sf_remove_holes(.) %>% + st_make_valid() + # write_sf(protoHRUs, ND_gpkg, HRU_layer) + # rm(protoHRUs) + } + + # Update missing_cats + # write_sf(divides_lu, ND_gpkg, divides_nd) + + list(divides_nd = divides_lu, HRU_layer = protoHRUs) +} #' Determine if any divides that have not been assigned to an aggregated catchment #' can be aggregated on the basis of their HUC12 ID @@ -62,7 +618,7 @@ dissolve_holes <- function(divides_poi){ #' @return (sf data frame) data frame with unaggregated divides assigned if HUC10 if grouped divides #' meet CAC criteria assign_HUC12 <- function(divides_poi, HUC12_xwalk, nhd, CAC_num){ - + miss_cats_sub <- filter(divides_poi, is.na(POI_ID)) # HUC12/cat intersections associated with missing catchments @@ -73,7 +629,7 @@ assign_HUC12 <- function(divides_poi, HUC12_xwalk, nhd, CAC_num){ #*************************************** # STEP 1 - Identify aggregated cats without aggregated ID that match the HUC12 footprint # Note - out west we may need to expand this to HUC10 - + # HUC12s that match Catchment footprint huc12_in_nhd <- nhd_wbd_int_sub %>% # int_HUC_12 is the intersection HUC12, not the NAWQA Xwalk HUC12 @@ -108,7 +664,7 @@ assign_HUC12 <- function(divides_poi, HUC12_xwalk, nhd, CAC_num){ #*************************************** # STEP 2 - Identify HUC12s that match very large nhd catchments # Note - out west we may need to expand this to HUC10 - + # Update the number of missing catchments miss_cats_sub <- filter(divides_poi, is.na(POI_ID)) @@ -136,7 +692,7 @@ assign_HUC12 <- function(divides_poi, HUC12_xwalk, nhd, CAC_num){ mutate(POI_ID = ifelse(!is.na(new_POI_ID), new_POI_ID, POI_ID), aggStep = ifelse(member_COMID %in% cat_in_HUC12s$member_COMID, "HUC_12", aggStep)) %>% select(-new_POI_ID) - + #*************************************** # STEP 2 - Identify edge non-aggregated catchments that may contribute to the HUC12 but are not # assigned an aggregated ID and may not share the HUC12 value @@ -306,6 +862,45 @@ assign_HUC10 <- function(divides, HUC12_xwalk, nhd, CAC_num){ return(mutate(divides, comid_char = as.character(member_COMID))) } +#' create coastal catchments +#' @param vpu_nhd vpu_nhd from vpu_attributes step +#' @param divides_lu divides from assign huc12 step +#' @param vpu_WBD vpu_WBD from vpu_attributes step +#' @param debug logical whether to output proto HRUs +#' +create_coastal_catchments <- function(vpu_nhd, divides_lu, vpu_WBD, debug = FALSE) { + protoHRUs <- NULL + # aggregate frontal hucs of same type + if("Coastline" %in% unique(vpu_nhd$FTYPE)){ + print(paste0("Currently there are ", sum(is.na(divides_lu$POI_ID)), " cats without a POI_ID")) + + + # Function to create coastal catchments by HUC12 and catch inflows. + divides_lu <- coastal_cats(divides_lu, vpu_nhd, vpu_WBD) + + if(debug) { + # DEBUG: + # HRU layer + protoHRUs <- divides_lu %>% + group_by(POI_ID) %>% + summarize(do_union = TRUE) %>% + sfheaders::sf_remove_holes(.) %>% + st_make_valid() + } + + + # Update missing_cats + # write_sf(divides_lu, ND_gpkg, divides_nd) + + } else { + print ("No Coastlines in VPU") + } + + vpu_nhd <- filter(vpu_nhd, FTYPE != "Coastline") + + list(vpu_nhd = vpu_nhd, divides_nd = divides_lu, protoHRUs = protoHRUs) +} + #' Determine if any divides that have not been assigned to an aggregated catchment #' can be aggregated as coastal catchments using their HUC12 ID (no CAC match requirement) #' @param nhd (sf data frame) full data frame of nhd flowlines (including coastal flowlines) @@ -336,10 +931,10 @@ coastal_cats <- function(divides_poi, full_nhd, vpu_HUC12){ #divides_miss <- rbind(coastal_cats, filter(frontal_HUC12_cats, !member_COMID %in% coastal_cats$member_COMID)) %>% # mutate(member_COMID = as.character(member_COMID)) - + coastalFL <- rbind(coastFL, filter(st_drop_geometry(full_nhd), COMID %in% divides_miss$member_COMID)) %>% distinct() - + # # Just assign remaining nonPOI costal cats the HUC12, not going to get much better information divides_miss <- divides_miss %>% mutate(HUC_12_POI = ifelse(member_COMID %in% coastal_cats$member_COMID, HUC_12_int, POI_ID)) @@ -368,7 +963,7 @@ coastal_cats <- function(divides_poi, full_nhd, vpu_HUC12){ cat_at_coast <- coastal_cats %>% inner_join(st_drop_geometry(nhd_at_coast) %>% select(COMID, Hydroseq), by = c("member_COMID_int" = "COMID")) - + nhd2coast_us <- full_nhd %>% filter(TerminalPa %in% filter(nhd2coast, COMID %in% cat2coast$member_COMID)$Hydroseq) %>% select(COMID, TerminalPa) @@ -418,19 +1013,135 @@ coastal_cats <- function(divides_poi, full_nhd, vpu_HUC12){ select(-HUC_12_POI, member_COMID, ID, rpu, POI_ID, aggStep, HUC_12_int, AreaHUC12, intArea) %>% mutate(vpu_ID = substr(rpu, 1, 2), comid_char = as.character(member_COMID)) } - + divides_poi <- dissolve_holes(divides_poi) return(divides_poi) } } +#' fix holes +#' @param divides_lu divides layer from create coastal step +#' @param HUC12_table wbd layer from the NHDPlus +#' @param nhdplus_sinks sink layer from the NHDPlus +#' @param min_da_km_terminal minimum drainage area to include a terminal network +#' @param vpu_nhd vpu_nhd from create coastal step +#' @param lookup lookup_table_aggregate from merged vpus +#' @param elev elev from vpu configuration step +#' +fix_holes <- function(divides_lu, HUC12_table, + nhdplus_sinks, min_da_km_terminal, + vpu_nhd, lookup, elev) { + + print(paste0("Currently there are ", sum(is.na(divides_lu$POI_ID)), " cats without a POI_ID")) + + if(sum(is.na(divides_lu$POI_ID)) > 0){ + + # Handle nhd sinks + # arrange divides_lu by member_COMID and populate with RowID for easy iteration within next two steps + divides_lu <- divides_lu %>% + arrange(member_COMID, POI_ID) %>% + mutate(row_ID = row_number()) %>% + arrange(row_ID) + + HUC_sinks <- NHD_sinks(divides_lu, area_thresh = min_da_km_terminal/2, + HUC12_table = HUC12_table, + NHD_sinks = nhdplus_sinks) + + if(length(HUC_sinks) == 2){ + divides_lu <- HUC_sinks$divides_poi + sinks_table <- HUC_sinks$sink_cats_table + } + + # Scan for terminals that may have been refactored + missing_ds <- filter(divides_lu, is.na(POI_ID)) + + term_refactored_fun <- function(x) { + + ds_ref <- nhdplusTools::get_DM(vpu_nhd, x, include = FALSE) + + if(length(ds_ref) == 0){ + return(filter(missing_ds, member_COMID == x)) + } + + lookup_test <- filter(lookup, NHDPlusV2_COMID %in% ds_ref) + + divides_test <- filter(divides_lu, ID %in% lookup_test$reconciled_ID) %>% + filter(!is.na(POI_ID)) + + # print(unique(divides_test$POI_ID)) + + if(length(unique(divides_test$POI_ID)) == 1){ + return(filter(missing_ds, member_COMID == x) %>% + mutate(POI_ID = unique(divides_test$POI_ID))) + } else { + return(filter(missing_ds, member_COMID == x)) + } + + } + + term_refactored <- lapply(missing_ds$member_COMID, term_refactored_fun) + + term_refactored <- data.table::rbindlist(term_refactored[lengths(term_refactored) > 1], + fill = TRUE) %>% + st_as_sf() + + divides_lu <- filter(divides_lu, !member_COMID %in% term_refactored$member_COMID) %>% + rbind(term_refactored) + + if(sum(is.na(divides_lu$POI_ID)) > 0) { + + divides_dem <- miss_term_assign(divides_lu, vpu_nhd, elev) + + divides_lu <- divides_lu %>% + left_join(select(divides_dem, member_COMID, agg_ID), + by = "member_COMID") %>% + mutate(POI_ID = ifelse(!is.na(agg_ID), agg_ID, POI_ID), + aggStep = ifelse(!is.na(agg_ID), "boundary DEM", aggStep)) %>% + select(-agg_ID) + + if(exists("sinks_table")){ + sinks_table_fin <- filter(sinks_table, !member_COMID %in% divides_dem$member_COMID) + sinks_table_fin <- data.table::rbindlist(list(sinks_table_fin, + divides_dem), fill = TRUE) + } else { + sinks_table_fin <- divides_dem + } + + # write_sf(sinks_table_fin, ND_gpkg, ND_table) + } + + } else { + print ("all unaggregated catchments assigned") + } + + divides_lu <- dissolve_holes(divides_lu) + + missing_cats_out <- NULL + if(sum(is.na(divides_lu$POI_ID)) > 0){ + missing_cats_out <- filter(divides_lu, is.na(POI_ID)) + # write_sf(, ND_gpkg, missing_cats) + } + + # Prob HRU - filter(all_hrus, POI_ID == 140402000209) + all_hrus <- filter(divides_lu, !is.na(POI_ID)) %>% + group_by(POI_ID) %>% + summarize(do_union = TRUE) %>% + sfheaders::sf_remove_holes(.) %>% + nhdplusTools:::check_valid(.) + + # write_sf(divides_lu, ND_gpkg, divides_nd) + # write_sf(all_hrus, ND_gpkg, HRU_layer) + + list(missing_cats = missing_cats_out, divides_nd = divides_lu, HRU_layer = all_hrus) +} + #' Determines length of shared perimeters with adjacent catchments #' @param divides (sf data frame) data frame of reconciled divides # #' @return (sf data frame) (data frame) DF of shared perimeter lengths for cats perims <- function(divides_poi){ - + # Subset to data frame of existing divides with POI assignments divides_wpoi <- filter(divides_poi, !is.na(POI_ID)) %>% dplyr::select(POI_ID, HUC_12_int, member_COMID, row_ID) @@ -445,9 +1156,9 @@ perims <- function(divides_poi){ divides_nopoi_int <- st_intersection(divides_nopoi_buff, divides_wpoi) %>% mutate(perim = lwgeom::st_perimeter(.) / 2) %>% dplyr::select(POI_ID, HUC_12_int = HUC_12_int, member_COMID, row_ID, perim, bound_cat_POI_ID = POI_ID.1, - bound_cat_HUC12 = HUC_12_int.1, bound_cat = member_COMID.1, bound_cat_row = row_ID.1) %>% + bound_cat_HUC12 = HUC_12_int.1, bound_cat = member_COMID.1, bound_cat_row = row_ID.1) %>% st_drop_geometry() - + return(divides_nopoi_int) } @@ -464,7 +1175,7 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ divides_poi$HUC_10 <- substr(divides_poi$HUC_12_int, 1, 10) # Filter HUC12 crosswalk to HUC10s id'ed in divides and closed HUC types - HUC12_sinks <- readRDS(HUC12_table) %>% + HUC12_sinks <- HUC12_table %>% filter(HUC_10 %in% divides_poi$HUC_10) %>% filter(HU_10_TYPE == "C" | HU_12_TYPE == "C") @@ -477,7 +1188,7 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ group_by(POI_ID) %>% filter(n() < 2) %>% ungroup() - + # Get the NHD sinks sink_cats_nhd <- filter(divides_poi, is.na(POI_ID)) %>% filter(grepl("-", .$member_COMID)) @@ -504,19 +1215,19 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ group_by(HUC_12_int) %>% summarize(do_union = TRUE) %>% mutate(agg_id = row_number()) - + m_per_km <- 1000 # Cast to polygon agg_cats_poly <- do.call(rbind, lapply(1:nrow(sink_cats_agg), - function(i){st_cast(sink_cats_agg[i,], "POLYGON")})) %>% + function(i){st_cast(sink_cats_agg[i,], "POLYGON")})) %>% mutate(area = as.numeric(st_area(.))/m_per_km^2) - + # Create new 'HRUs' out of larger sinks above the size threshold if (nrow(filter(agg_cats_poly, area > area_thresh)) > 0){ # Filter to only large sinks agg_cats_large <- filter(agg_cats_poly, area > area_thresh) - + # centroids of divides with no assigned POI_ID cents <- st_point_on_surface(sink_cats) # intersect centroids with the aggregated HUC12 polygons @@ -537,9 +1248,9 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ mutate(POI_ID = ifelse(!is.na(POI_ID_new), POI_ID_new, POI_ID), aggStep = ifelse(!is.na(POI_ID_new), "HUC12_sinks, large", aggStep)) %>% dplyr::select(-POI_ID_new) - + print ("looking at aggregating smaller sinks") - + sink_cats_small <- filter(sink_cats, !member_COMID %in% filter(sink_cats_final, !is.na(POI_ID_new))$member_COMID) %>% filter(grepl("-", .$member_COMID)) @@ -553,22 +1264,22 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ # Get pertinent boundaries cat_bound <- perims(divides_poi) %>% filter(!bound_cat %in% sink_cats_small$member_COMID) - + # centroids of divides with no assigned POI_ID cents <- st_point_on_surface(sink_cats_small) # intersect centroids with the aggregated HUC12 polygons cats_int <- st_intersects(cents, agg_cats_small) - + # Index of HRUs that missing cats may lie within with holes filled sink_cats_index <- sapply(cats_int, function(s) if (length(s) == 0) NA else s) - + # Populate ID of HRUs where applicable sink_cats_small$agg_id <- agg_cats_small[sink_cats_index,]$agg_id - + sink_cats_nhd_table <- st_drop_geometry(sink_cats_small) %>% inner_join(dplyr::select(cat_bound, -HUC_12_int), by = "member_COMID") %>% distinct() - + small_sinks_POI <- sink_cats_nhd_table %>% distinct(agg_id, HUC_12_int, bound_cat_HUC12, bound_cat_POI_ID) %>% #summarize(perim = sum(perim)) %>% @@ -576,11 +1287,11 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ group_by(agg_id) %>% filter(n_distinct(bound_cat_POI_ID) == 1) %>% ungroup() - + sink_cats_assign <- dplyr::select(sink_cats_nhd_table, member_COMID, agg_id) %>% inner_join(dplyr::select(small_sinks_POI, agg_id, POI_ID_new = bound_cat_POI_ID), by = "agg_id") %>% distinct() - + # Bring over POI_ID to divides divides_poi <- divides_poi %>% left_join(sink_cats_assign, by = "member_COMID") %>% @@ -595,7 +1306,7 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ filter(HUC_12 == neighbor_HUC12) %>% mutate(aggStep = "HUC12_sinks, small") %>% st_drop_geometry() - + } } large_sinks <- select(sink_cats, member_COMID, HUC_12 = HUC_12_int, POI_ID, aggStep) %>% @@ -623,7 +1334,7 @@ NHD_sinks <- function(divides_poi, area_thresh, HUC12_table, NHD_sinks){ #' Keep the below link here until publishd #' Topo data bins: https://www.sciencebase.gov/catalog/item/5f5154ba82ce4c3d12386a02 miss_term_assign <- function(divides_poi, nhd, elev){ - + # comid <- single comid of a missing catchment (member_COMID in divides_lu) #' Find and lump remaining areas where possible #' @param comid (integer) comid of single unaggregated terminal outlet @@ -637,7 +1348,7 @@ miss_term_assign <- function(divides_poi, nhd, elev){ library(dplyr) library(sf) library(nhdplusTools) - + us_network <- get_UT(nhd, incomid) # Catchment divide not assigned/aggregated and upstream components @@ -646,7 +1357,7 @@ miss_term_assign <- function(divides_poi, nhd, elev){ } else { missing_cats <- filter(divides_poi, member_COMID == incomid) } - + rpu <- filter(missing_cats, member_COMID == incomid)$rpu missing_cats_union <- st_union(missing_cats) %>% @@ -663,7 +1374,7 @@ miss_term_assign <- function(divides_poi, nhd, elev){ if(nrow(cats_buff) == 0){ miss_cat <- dplyr::select(st_drop_geometry(missing_cats), - outlet_COMID = member_COMID) %>% + outlet_COMID = member_COMID) %>% dplyr::mutate(outlet_COMID = as.integer(outlet_COMID), Elev = NA, neighbor_COMID = NA, POI_ID = NA) %>% dplyr::select(Elev, outlet_COMID, neighbor_COMID, POI_ID) @@ -697,7 +1408,7 @@ miss_term_assign <- function(divides_poi, nhd, elev){ term_outlets <- filter(divides_poi, is.na(POI_ID) | aggStep == "HUC12_sinks, small") %>% dplyr::rename(outlet_COMID = member_COMID) - + out_df <- do.call(rbind, pbapply::pblapply(unique(term_outlets$outlet_COMID), assign_func, divides_poi, nhd, elev, cl = NULL)) %>% @@ -713,60 +1424,45 @@ miss_term_assign <- function(divides_poi, nhd, elev){ by = c("neighbor_COMID" = "member_COMID")) %>% dplyr::select(member_COMID, neighbor_COMID, HUC_12, neighbor_HUC12, agg_ID = POI_ID) %>% mutate(aggStep = "boundary DEM") - + return(out_df) } -#' Merges geopackages together to create CONUs geopackage of features -#' @param VPU VPU from NHDPlusV2 -#' @param full_cats (sf data.frame) all catchments (sinks/flowline) within given VPU -#' @param divides (sf data.frame) divides layer for a given VPU -#' @param wbd (data.frame) HUC12 X-walk table, see get_data for full citation -#' @param wbd_SF (sf data.frame) HUC12 layer from NHDPlusv2.1 national geodatabase -#' @return (sf data.frame) intersection of nhd catchments and divides layer with HUC12 -#' -get_HUC12_Xwalk <- function(vpu, full_cats = FALSE, divides = FALSE, wbd = FALSE, wbd_SF = FALSE){ +#' manual checks +#' @param noagg_divides missing_cats from fix holes step +#' @param divides_lu divides from fix holes step +#' @param vpu what vpu we are running +#' +manual_checks <- function(noagg_divides, divides_lu, vpu) { + + noagg_divides <- noagg_divides %>% #read_sf(ND_gpkg, missing_cats) %>% + select(row_ID, POI_ID_noagg = POI_ID) %>% + st_drop_geometry() - # HUC02 includes some - if(vpu == "02"){ - grep_exp <-"^02|^04" - } else if (vpu == "08") { - grep_exp <- "^03|^08" + all_hrus <- NULL + + if(all(!is.na(noagg_divides$POI_ID_noagg))){ + print ("all unaggregated divides accounted for") + + divides_lu <- divides_lu %>% + left_join(noagg_divides, by = "row_ID") %>% + mutate(POI_ID = ifelse(!is.na(POI_ID_noagg), POI_ID_noagg, POI_ID), + aggStep = ifelse(!is.na(aggStep), "manual", aggStep)) %>% + select(-POI_ID_noagg) + + # Prob HRU - filter(all_hrus, POI_ID == 140402000209) + all_hrus <- filter(divides_lu, !is.na(POI_ID)) %>% + group_by(POI_ID) %>% + summarize(do_union = TRUE) %>% + sfheaders::sf_remove_holes(.) %>% + nhdplusTools:::check_valid(.) + + # write_sf(divides_lu, ND_gpkg, divides_nd) + # write_sf(all_hrus, ND_gpkg, HRU_layer) + list(HRU_layer = all_hrus, divides_nd = divides_lu) } else { - grep_exp <- paste0("^", substr(vpu, start = 1, stop = 2)) + print ("account for unaggregated divides in ", vpu) + NULL } - # crosswalk to HU12, filter by VPU - nhd_to_HU12 <- read.csv(wbd, colClasses = c("character", "integer", "character")) %>% - filter(grepl(grep_exp, .data$HUC_12)) %>% - mutate(HUC_8 = substr(HUC_12, 1, 8)) - - # Read in the NHDPlus National GDB HUC12 layer - HUC12_lyr <- readRDS(file.path(wbd_SF)) %>% - #filter(grepl(paste0("^", VPU,".*"), .data$HUC_12)) %>% - filter(grepl(grep_exp, .data$HUC_12)) %>% #, - st_transform(st_crs(full_cats)) %>% - st_make_valid() %>% - select(HUC_12, ACRES, HU_10_TYPE, HU_12_DS, HU_12_MOD, HU_12_TYPE) - - reg_cats <- full_cats %>% - left_join(select(nhd_to_HU12, c(FEATUREID, HUC_12)), by = "FEATUREID") - - sqm_per_sqkm <- 0.000001 - acres_per_sqkm <- .00404686 - # Intersect the HUC12s and catchments - cats_HUC12 <- sf::st_intersection(reg_cats, HUC12_lyr) %>% - # Convert area to squaker kilometers - mutate(intArea = as.numeric(st_area(.)) * sqm_per_sqkm, - AreaHUC12 = ACRES * acres_per_sqkm) %>% - rename(HUC_12_int = HUC_12.1) - - divides_HUC12 <- sf::st_intersection(divides, HUC12_lyr) %>% - # Convert area to squaker kilometers - mutate(intArea = as.numeric(st_area(.)) * sqm_per_sqkm, - AreaHUC12 = ACRES * acres_per_sqkm) %>% - rename(HUC_12_int = HUC_12) - - - return(list(cats_HUC12 = cats_HUC12, divides_HUC12 = divides_HUC12)) -} +} \ No newline at end of file diff --git a/workspace/_targets.yaml b/workspace/_targets.yaml index 9c52532..363f8c5 100644 --- a/workspace/_targets.yaml +++ b/workspace/_targets.yaml @@ -10,3 +10,6 @@ 04_aggregate: script: _targets/04_aggregate_targets.R store: .targets_stores/04_aggregate +05_non_dendritic: + script: _targets/05_non_dendritic_targets.R + store: .targets_stores/05_non_dendritic diff --git a/workspace/_targets/04_aggregate_targets.R b/workspace/_targets/04_aggregate_targets.R index 8e1f7a1..6374f23 100644 --- a/workspace/_targets/04_aggregate_targets.R +++ b/workspace/_targets/04_aggregate_targets.R @@ -1,8 +1,9 @@ library(targets) -tar_option_set(packages = c("hyRefactor", "dplyr", "sf"), +tar_option_set(packages = c("dplyr", "sf"), memory = "transient", garbage_collection = TRUE, error = "stop", - storage = "worker", retrieval = "worker", deployment = "main") + storage = "worker", retrieval = "worker", deployment = "main", + iteration = "list") library(future) library(future.callr) @@ -14,29 +15,12 @@ source("R/04_aggregate_functions.R") prep_path <- ".targets_stores/01_prep/objects/" -list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), - tar_target(data_paths, jsonlite::read_json(data_paths_file)), - tar_target(rpu_codes_file, file.path(prep_path, "all_rpu_codes"), format = "file"), +list(tar_target(rpu_codes_file, file.path(prep_path, "all_rpu_codes"), format = "file"), tar_target(rpu_codes, readRDS(rpu_codes_file)), tar_target(rpu_gpkg, file.path("cache", paste0(rpu_codes, "_refactor.gpkg")), pattern = map(rpu_codes), deployment = "worker"), tar_target(agg_gpkg, file.path("cache", paste0(rpu_codes, "_aggregate.gpkg")), pattern = map(rpu_codes), deployment = "worker"), - - tar_target(rpu_vpu_file, file.path(prep_path, "rpu_vpu"), format = "file"), - tar_target(rpu_vpu, readRDS(rpu_vpu_file)), - tar_target(rpu_vpu_out_file, file.path(prep_path, "rpu_vpu_out"), format = "file"), - tar_target(rpu_vpu_out, readRDS(rpu_vpu_out_file)), - - tar_target(vpu_codes_file, file.path(prep_path, "vpu_codes"), format = "file"), - tar_target(vpu_codes, readRDS(vpu_codes_file)), - tar_target(vpu_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg")), - pattern = map(vpu_codes), deployment = "worker"), - tar_target(rfc_gpkg, file.path("cache", paste0("refactor_", vpu_codes,".gpkg")), - pattern = map(vpu_codes), deployment = "worker"), - tar_target(gf_gpkg, file.path("cache", paste0("GF_", vpu_codes, ".gpkg")), - pattern = map(vpu_codes), deployment = "worker"), - tar_target(cache_split, file.path("cache", paste0(rpu_codes, ".rda")), pattern = map(rpu_codes)), @@ -78,28 +62,5 @@ list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), write_sf(aggregated$double_outlets, agg_gpkg, "double_outlets") write_sf(long_form_pois, agg_gpkg, poi_data_table) write_sf(lookup, agg_gpkg, lookup_table_refactor) - }, pattern = map(agg_gpkg, aggregated, long_form_pois, lookup), deployment = "worker"), - - tar_target(merged_vpu, merge_rpu_to_vpu(read_sf(vpu_gpkg, poi_data_table), - rpu_vpu, vpu_codes, rpu_vpu_out, - \(rpu) file.path("cache", paste0(rpu, "_refactor.gpkg")), - \(rpu) file.path("cache", paste0(rpu, "_aggregate.gpkg")), - lookup_table_refactor, reconciled_layer, - divide_layer, split_divide_layer, - agg_fline_layer, agg_cats_layer, - mapped_outlets_layer), - pattern = map(vpu_gpkg, vpu_codes), deployment = "worker"), - tar_target(write_out, { - write_sf(merged_vpu$pois, gf_gpkg, poi_data_table) - write_sf(merged_vpu$lookup_table_refactor, rfc_gpkg, lookup_table_refactor) - write_sf(merged_vpu$reconciled_layer, rfc_gpkg, reconciled_layer) - write_sf(merged_vpu$divide_layer, rfc_gpkg, divide_layer) - write_sf(merged_vpu$split_divide_layer, rfc_gpkg, split_divide_layer) - write_sf(merged_vpu$mapped_outlets_layer, gf_gpkg, mapped_outlets_layer) - write_sf(merged_vpu$agg_cats_layer, gf_gpkg, agg_cats_layer) - write_sf(merged_vpu$agg_fline_layer, gf_gpkg, agg_fline_layer) - write_sf(merged_vpu$lookup_table_aggregate, gf_gpkg, lookup_table_refactor) - write_sf(merged_vpu$catchment_network_table_refector, rfc_gpkg, catchment_network_table) - write_sf(merged_vpu$cathment_network_table_aggregate, gf_gpkg, catchment_network_table) - }, pattern = map(merged_vpu, gf_gpkg, rfc_gpkg), deployment = "worker") + }, pattern = map(agg_gpkg, aggregated, long_form_pois, lookup), deployment = "worker") ) diff --git a/workspace/_targets/05_non_dendritic_targets.R b/workspace/_targets/05_non_dendritic_targets.R new file mode 100644 index 0000000..7ff73f5 --- /dev/null +++ b/workspace/_targets/05_non_dendritic_targets.R @@ -0,0 +1,115 @@ +library(targets) + +tar_option_set(packages = c("hyRefactor", "dplyr", "sf"), + memory = "transient", garbage_collection = TRUE, error = "stop", + storage = "worker", retrieval = "worker", deployment = "main") + +library(future) +library(future.callr) +plan(multisession) + +source("R/config.R") +source("R/user_vars.R") +source("R/05_non_dendritic_functions.R") + +prep_path <- ".targets_stores/01_prep/objects/" + +list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"), + tar_target(data_paths, jsonlite::read_json(data_paths_file)), + + tar_target(all_nhdplus_attributes, + sf::st_drop_geometry(sf::read_sf(data_paths$nhdplus_gdb, "NHDFlowline_Network"))), + tar_target(nhdplus_sinks, read_sf(data_paths$nhdplus_gdb, "Sink")), + tar_target(nhdplus_wbd, read_sf(data_paths$nhdplus_gdb, "HUC12") |> + st_make_valid() |> st_transform(crs = proj_crs)), + tar_target(cat_rpu_table, readRDS(data_paths$fullcats_table)), + + tar_target(rpu_vpu_file, file.path(prep_path, "rpu_vpu"), format = "file"), + tar_target(rpu_vpu, readRDS(rpu_vpu_file)), + tar_target(rpu_vpu_out_file, file.path(prep_path, "rpu_vpu_out"), format = "file"), + tar_target(rpu_vpu_out, readRDS(rpu_vpu_out_file)), + + ## set up to map over vpu_codes + tar_target(vpu_codes_file, file.path(prep_path, "vpu_codes"), format = "file"), + tar_target(vpu_codes, readRDS(vpu_codes_file)), + tar_target(vpu_gpkg, file.path("cache", paste0("reference_", vpu_codes,".gpkg")), + pattern = map(vpu_codes), deployment = "worker", format = "file"), + + # we only write to this gpkg + tar_target(rfc_gpkg, file.path("cache", paste0("refactor_", vpu_codes,".gpkg")), + pattern = map(vpu_codes), deployment = "worker"), + tar_target(gf_gpkg, file.path("cache", paste0("GF_", vpu_codes, ".gpkg")), + pattern = map(vpu_codes), deployment = "worker"), + + # TODO: setup the refactor and aggregate geopackages as format = "file" targets? + tar_target(merged_vpu, merge_rpu_to_vpu(read_sf(vpu_gpkg, poi_data_table), + rpu_vpu, vpu_codes, rpu_vpu_out, + \(rpu) file.path("cache", paste0(rpu, "_refactor.gpkg")), + \(rpu) file.path("cache", paste0(rpu, "_aggregate.gpkg")), + lookup_table_refactor, reconciled_layer, + divide_layer, split_divide_layer, + agg_fline_layer, agg_cats_layer, + mapped_outlets_layer), + pattern = map(vpu_gpkg, vpu_codes), deployment = "worker"), + + tar_target(write_out, { + write_sf(merged_vpu$pois, gf_gpkg, poi_data_table) + write_sf(merged_vpu$lookup_table_refactor, rfc_gpkg, lookup_table_refactor) + write_sf(merged_vpu$reconciled_layer, rfc_gpkg, reconciled_layer) + write_sf(merged_vpu$divide_layer, rfc_gpkg, divide_layer) + write_sf(merged_vpu$split_divide_layer, rfc_gpkg, split_divide_layer) + write_sf(merged_vpu$mapped_outlets_layer, gf_gpkg, mapped_outlets_layer) + write_sf(merged_vpu$agg_cats_layer, gf_gpkg, agg_cats_layer) + write_sf(merged_vpu$agg_fline_layer, gf_gpkg, agg_fline_layer) + write_sf(merged_vpu$lookup_table_aggregate, gf_gpkg, lookup_table_refactor) + write_sf(merged_vpu$catchment_network_table_refector, rfc_gpkg, catchment_network_table) + write_sf(merged_vpu$cathment_network_table_aggregate, gf_gpkg, catchment_network_table) + }, pattern = map(merged_vpu, gf_gpkg, rfc_gpkg), deployment = "worker"), + + ## prep for non-dendritic + tar_target(vpu_attributes, get_vpu_config(vpu_codes, data_paths, + all_nhdplus_attributes, nhdplus_wbd), + pattern = map(vpu_codes)), + # NOTUSED? + # tar_target(flowline_vpu, st_transform(read_sf(vpu_gpkg, nhd_flowline), proj_crs), + # pattern = map(vpu_gpkg), deployment = "worker"), + tar_target(catchment_vpu, st_transform(read_sf(vpu_gpkg, nhd_catchment), proj_crs), + pattern = map(vpu_gpkg), deployment = "worker"), + tar_target(divides_vpu, st_transform(merged_vpu$divide_layer, proj_crs), + pattern = map(merged_vpu), deployment = "worker"), + # NOTUSED? + # tar_target(lookup_vpu, merged_vpu$lookup_table_aggregate, + # pattern = map(merged_vpu), deployment = "worker"), + + tar_target(hu12_xwalk, make_hu12_xwalk(catchment_vpu, divides_vpu, vpu_codes, + # TODO track down this file or recreate it!! + file.path(data_paths$nhdplus_dir, + "CrosswalkTable_NHDplus_HU12.csv"), + nhdplus_wbd), + pattern = map(catchment_vpu, divides_vpu, vpu_codes), deployment = "worker"), + tar_target(divides_non_dendritic, make_divides_nd(hu12_xwalk$divide_layer, + merged_vpu$lookup_table_aggregate, + hu12_xwalk$nhd_catchment, + cat_rpu_table, + debug = TRUE), + pattern = map(hu12_xwalk, merged_vpu), deployment = "worker"), + tar_target(divides_w_huc12, assign_huc12_fun(divides_non_dendritic$divides_nd, + hu12_xwalk$xwalk_layer, + vpu_attributes$vpu_nhd, + debug = FALSE), + pattern = map(divides_non_dendritic, hu12_xwalk, vpu_attributes), deployment = "worker"), + tar_target(divides_w_coastal, create_coastal_catchments(vpu_attributes$vpu_nhd, + divides_w_huc12$divides_nd, + vpu_attributes$vpu_WBD, + debug = FALSE), + pattern = map(vpu_attributes, divides_w_huc12), deployment = "worker"), + tar_target(divides_fixed_holes, fix_holes(divides_w_coastal$divides_nd, + nhdplus_wbd, nhdplus_sinks, + min_da_km_terminal, + divides_w_coastal$vpu_nhd, + merged_vpu$lookup_table_aggregate, + vpu_attributes$elev), + pattern = map(divides_w_coastal, vpu_attributes, merged_vpu), deployment = "worker"), + tar_target(final, manual_checks(divides_fixed_holes$missing_cats, divides_fixed_holes$divides_nd, vpu_codes), + pattern = map(divides_fixed_holes, vpu_codes), deployment = "worker") +) diff --git a/workspace/_targets_runner.R b/workspace/_targets_runner.R index 46cfd94..f0786c2 100644 --- a/workspace/_targets_runner.R +++ b/workspace/_targets_runner.R @@ -83,3 +83,9 @@ Sys.setenv(TAR_PROJECT = "04_aggregate") workers = 6 tar_make_future(workers = workers) + +Sys.setenv(TAR_PROJECT = "05_non_dendritic") + +workers = 6 + +tar_make_future(workers = workers) -- GitLab