Newer
Older
hyfabric_source <- list.files(pattern = "hyfabric_.*.tar.gz")
hyfabric_version <- substr(hyfabric_source, 10, 14)
if(!(require(hyfabric, quietly = TRUE) && packageVersion("hyfabric") == hyfabric_version)) {
dir.create(path = Sys.getenv("R_LIBS_USER"), showWarnings = FALSE, recursive = TRUE)
.libPaths(Sys.getenv("R_LIBS_USER"))
install.packages(hyfabric_source, lib = Sys.getenv("R_LIBS_USER"),
fix_headwaters <- function(nhd_rds, out_gpkg, new_atts = NULL,
nhdpath = nhdplusTools::nhdplus_path(),

Bock, Andy
committed
if(!file.exists(out_gpkg)) {
# Need to replace headwater flowlines with the trimmed burn line event.
# Some headwater flowlines are not fully within the catchment they go with.
ble <- sf::read_sf(nhdpath, "BurnLineEvent")
ble <- dplyr::left_join(dplyr::select(sf::st_drop_geometry(nhd), COMID),
ble, by = "COMID")
ble <- sf::st_zm(sf::st_as_sf(ble))
sf::st_geometry(nhd)[!sf::st_is_empty(sf::st_geometry(ble)) & (nhd$StartFlag == 1 | nhd$Divergence == 2)] <-
sf::st_geometry(ble)[!sf::st_is_empty(sf::st_geometry(ble)) & (nhd$StartFlag == 1 | nhd$Divergence == 2)]
if(!is.null(new_atts)) {
new_atts <- data.table::fread(new_atts,
integer64 = "character",
showProgress = FALSE)
if("weight" %in% names(new_atts)) new_atts <- rename(new_atts,
ArbolateSu = weight)
nhd <- select(nhd, COMID, FromNode, ToNode,
StartFlag, StreamCalc, Divergence, DnMinorHyd) %>%
left_join(new_atts, by = c("COMID" = "comid")) %>%
nhdplusTools::align_nhdplus_names()
}
nhd <- sf::st_sf(nhd)
m_to_km <- (1/1000)
nhd$LENGTHKM <- as.numeric(sf::st_length(sf::st_transform(nhd, 5070))) * m_to_km
custom_net <- select(sf::st_drop_geometry(nhd),
COMID, FromNode, ToNode, Divergence)
custom_net <- nhdplusTools::get_tocomid(custom_net, remove_coastal = FALSE)
custom_net <- select(custom_net, comid, override_tocomid = tocomid)
nhd <- left_join(nhd, custom_net, by = c("COMID" = "comid"))
nhd <- mutate(nhd, override_tocomid = ifelse(toCOMID == 0, override_tocomid, toCOMID))
# is a headwater and for sure flows to something.
check <- !nhd$COMID %in% nhd$override_tocomid &
!(nhd$override_tocomid == 0 | is.na(nhd$override_tocomid) |
!nhd$override_tocomid %in% nhd$COMID)
check_direction <- dplyr::filter(nhd, check)
if(!all(check_direction$override_tocomid[check_direction$override_tocomid != 0] %in% nhd$COMID))
stop("this won't work")
matcher <- match(check_direction$override_tocomid, nhd$COMID)
matcher <- nhd[matcher, ]
fn_list <- Map(list,
split(check_direction, seq_len(nrow(check_direction))),
split(matcher, seq_len(nrow(matcher))))
cl <- parallel::makeCluster(16)
new_geom <- pbapply::pblapply(cl = cl,
X = fn_list,
FUN = function(x) {
nhdplusTools::fix_flowdir(x[[1]]$COMID,
fn_list = list(flowline = x[[1]],
network = x[[2]],
check_end = "end"))
})
parallel::stopCluster(cl)
# One is empty. Deal with it and remove from the replacement.
error_index <- sapply(new_geom, inherits, what = "try-error")
errors <- filter(nhd, COMID %in% check_direction$COMID[error_index])
check[which(nhd$COMID %in% errors$COMID)] <- FALSE
if(!all(sapply(sf::st_geometry(errors), sf::st_is_empty))) {
stop("Errors other than empty geometry from fix flowdir")
}
new_geom <- new_geom[!error_index]
new_geom <- do.call(c, new_geom)
st_geometry(nhd)[check] <- new_geom
nhd <- dplyr::filter(nhd, COMID %in% new_atts$comid)
nhd <- select(nhd, -override_tocomid)
sf::write_sf(nhd, out_gpkg, "reference_flowlines")
## Is available in sbtools now, but will leave till on cran.
#' Retrieves files from SB in facet file_structure
#' @param sbitem (character) ScienceBase item ID
#' @param out_dir (directory) directory where files are downloaded
#'
# Retrieve files from SB if files not listed under sbtools
retrieve_sb <- function(sbitem, out_dir){
lapply(item$facets, function(x)
{
list(name = x$name,
files = lapply(x$files, function(y, out_path)
{
out_file <- file.path(out_path, y$name)
download.file(y$downloadUri, out_file, mode = "wb")
list(fname = y$name,
url = y$downloadUri)
},
out_path = file.path(data_dir, x$name)))
})
}
#' Documents why streamgage is excluded from POIs
#' @param source (character) data source
#' @param reason (character) reason gage is excluded
#' @return writes output to file
#'
gage_document <- function(data, source, drop_reason){
if(!file.exists("cache/dropped_gages.csv")){
write.csv(data %>% mutate(data_source = source, reason = drop_reason),
"cache/dropped_gages.csv", quote = F, row.names = F)
} else {
write.table(data %>%
mutate(data_source = source, reason = drop_reason),
file = "cache/dropped_gages.csv", append = T, sep = ",",
col.names = F, row.names = F, quote = F)
}
}
#' clean network
#' @param net data.frame nhdplus network
#' @param net_new data.frame new nhdplus network as from e2nhd
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#' @param nwm data.frame with national water model new network
#' @return data.frame fully reconciled attributes
clean_net <- function(net, net_new, nwm) {
# NOTE: need to disconnected diverted paths as indicated in the tonode / fromnode attributes.
diverted_heads <- filter(net_new, divergence == 2)
net_new <- filter(net_new, !tocomid %in% diverted_heads$comid)
# NOTE: avoid fcode == 56600 (coastal)
avoid <- net$comid[net$fcode == 56600]
# NOTE: avoid large groups where things have been redirected in an un-natural way.
groups <- group_by(nwm, tocomid)
groups <- data.frame(tocomid = summarise(groups, .groups = "drop"), size = group_size(groups))
groups <- filter(groups, size > 10 & tocomid != 0)
# NOTE: also avoid all flowlines that go to coastal flowlines.
# This is due to unwanted great lakes connections in the NWM.
avoid <- c(avoid,
net_new$comid[net_new$tocomid %in% avoid],
net_new$comid[net_new$tocomid %in% groups$tocomid],
nwm$comid[nwm$tocomid %in% groups$tocomid])
avoid <- unique(avoid)
nwm <- filter(nwm, tocomid %in% c(net_new$comid, 0))
net_new <- left_join(net_new, select(nwm, comid,
nwm_tocomid = tocomid),
by = "comid") %>%
mutate(tocomid = ifelse(is.na(tocomid), 0, tocomid))
## NWM already uses 0 for outlets.
# NOTE: just being explicit about this for clarity.
candidate <- select(net_new, comid, tocomid, nwm_tocomid) %>%
filter(!comid %in% avoid)
mismatch <- candidate[candidate$tocomid != candidate$nwm_tocomid, ]
net_new <- left_join(net_new, select(mismatch, comid,
new_tocomid = nwm_tocomid),
by = "comid")
to_change <- filter(net_new, !is.na(new_tocomid))
net_new <- net_new %>%
mutate(divergence = ifelse(comid %in% to_change$new_tocomid & divergence == 2, 1,
ifelse(comid %in% to_change$tocomid & divergence == 1, 2,
divergence)),
tocomid = ifelse(comid %in% to_change$comid, new_tocomid, tocomid)) %>%
select(-nwm_tocomid, -new_tocomid)
net_new <- filter(net_new, fcode != 56600)
net_new <- mutate(net_new, tocomid = ifelse(!tocomid %in% comid, 0, tocomid))
return(net_new)
}
#' Merges geopackages together to create CONUs geopackage of features
#' @param feat (character) Type of feature to merge (POIs, segments) character
#' @param in_gpkg (character) geopackage to merge (GF, ND_, etc.)
#' @param out_gpkg (character) Name of output geopackage
#' @return output geopackage of CONUS for given features
#'
Merge_VPU <- function(feat, in_gpkg, out_gpkg){
all_sf <- paste0(feat, "_CONUS")
#VPUs <- c("03N", "03S", "03W")
#VPUs <- c("10L", "10U")
VPUs <-c(paste0("0", c(1:2)), "03N", "03S", "03W", paste0("0", c(4:9)),
as.character(11:18), "10U", "10L")
featCON <- do.call("rbind", lapply(VPUs, function(x) {
tryCatch({
layer <- ifelse(feat %in% c("nhd_flowline", "unassigned_gages", "unassigned_TE"),
read_sf(file.path("cache", paste0(in_gpkg, x, ".gpkg")),
layer)},
error = function(e) stop(paste("didn't find", x,
"\n Original error was", e)))
}))
write_sf(featCON, out_gpkg, feat)
#' Capture all catchments (flowline, sink, etc.) in a given RPU
#' @param fcats (sf data.frame) NHDPlusv2.1 National geodatabase catchment layer
#' @param the_RPU (character) RPU to retrive full catchment set.
#' @param fl_rds character path to flowline rds file
#' @param nhd_gdb character path to NHD geodatabase
#' @return (sf data.frame) with FEATUREID, RPUID, FTYPE, TERMINAFL
fl <- read_sf(nhd_gdb, "NHDFlowline_Network") %>%
align_nhdplus_names()
# read the CatchmentSP
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
st_as_sf() %>%
st_drop_geometry() %>%
dplyr::select(FEATUREID, SOURCEFC)
# read the Flowlines
flowln_df <- fl %>%
st_as_sf() %>%
st_drop_geometry() %>%
select(COMID, FTYPE, TerminalFl, RPUID)%>%
rename(FEATUREID = COMID) %>%
rename(TERMINALFL = TerminalFl)%>%
filter(FEATUREID %in% cats$FEATUREID)
# Read in sinks
sink_df <- sf::st_read(nhd_gdb, layer = "Sink") %>%
st_drop_geometry() %>%
dplyr::select(SINKID, RPUID = InRPU) %>%
dplyr::rename(FEATUREID = SINKID) %>%
dplyr::mutate(FTYPE = "Sink", TERMINALFL = 0) %>%
dplyr::filter(!FEATUREID %in% flowln_df$FEATUREID)
#combine all the RPUIDs
lkup_rpu <- rbind(flowln_df, sink_df)
# FEATUREID 10957920, 20131674, 24534636 - this are the ids of the missing, checked on map,
# they look like they should not be used
# add the records for the missing
missrec_df <- data.frame(FEATUREID=c(10957920, 20131674, 245346360),
RPUID = c("03a", "03d", "17b"),
TERMINALFL = c(0, 0, 0)) %>%
mutate(FTYPE = "")
lkup_rpu2 <- rbind(lkup_rpu, missrec_df)
return(lkup_rpu2)

Blodgett, David L.
committed
merge_conus <- function(rpu_vpu, rpu_vpu_out,
lookup_table_refactor,
reconciled_layer,
divide_layer,
agg_fline_layer,
agg_cats_layer,
mapped_outlets_layer) {
vpus <- unique(rpu_vpu$vpuid)
gf_gpkgs <- paste0("cache/GF_", vpus, ".gpkg")
stop("not implemented")
# open all lookup tables and reconcile just the IDs tracking where the IDs came from.
# open all the spatial data and update IDs to match.
}
merge_refactor <- function(rpus, rpu_vpu_out,
lookup_table_refactor,
reconciled_layer,
divide_layer,
agg_fline_layer,
agg_cats_layer,
mapped_outlets_layer) {
out <- rep(list(list()), length(rpus))
names(out) <- rpus
s <- 10000000
for(rpu in rpus) {
refactor_gpkg <- file.path("cache", paste0(rpu, "_refactor.gpkg"))
aggregate_gpkg <- file.path("cache", paste0(rpu, "_aggregate.gpkg"))
out[[rpu]] <- setNames(list(sf::read_sf(aggregate_gpkg, lookup_table_refactor),
sf::read_sf(refactor_gpkg, reconciled_layer),
sf::read_sf(refactor_gpkg, divide_layer),
sf::read_sf(aggregate_gpkg, agg_fline_layer),
sf::read_sf(aggregate_gpkg, agg_cats_layer),
sf::read_sf(aggregate_gpkg, mapped_outlets_layer)),
c(lookup_table_refactor,
reconciled_layer,
divide_layer,
agg_fline_layer,
agg_cats_layer,
mapped_outlets_layer))
out[[rpu]][[reconciled_layer]] <- select(out[[rpu]][[reconciled_layer]],
ID, toID, LENGTHKM, TotDASqKM, member_COMID,
LevelPathID = orig_levelpathID)
out[[rpu]][[reconciled_layer]]$newID <-
seq(s, by = 1, length.out = nrow(out[[rpu]][[reconciled_layer]]))
s <- max(out[[rpu]][[reconciled_layer]]$newID) + 1
# get a toID that matches the new IDs
out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]],
select(st_drop_geometry(out[[rpu]][[reconciled_layer]]),
ID, newtoID = newID),
by = c("toID" = "ID"))
# find updates that we need to apply
update <- rpu_vpu_out[!rpu_vpu_out$toCOMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID &
rpu_vpu_out$COMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID, ]
# apply these updates for follow up if we got any
if(nrow(update) > 0) {
update <- left_join(update, out[[rpu]][[lookup_table_refactor]],
by = c("COMID" = "NHDPlusV2_COMID")) %>%
left_join(select(out[[rpu]][[lookup_table_refactor]],
NHDPlusV2_COMID, to_member_COMID = member_COMID),
by = c("toCOMID" = "NHDPlusV2_COMID"))
out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]],
select(update, reconciled_ID, toCOMID),
by = c("ID" = "reconciled_ID"))
}
update_id <- sf::st_drop_geometry(select(out[[rpu]][[reconciled_layer]],
old_ID = ID, ID = newID))
out[[rpu]][[lookup_table_refactor]] <- out[[rpu]][[lookup_table_refactor]] %>%
left_join(update_id, by = c("reconciled_ID" = "old_ID")) %>%
select(-reconciled_ID, reconciled_ID = ID) %>%
left_join(update_id, by = c("aggregated_flowpath_ID" = "old_ID")) %>%
select(-aggregated_flowpath_ID, aggregated_flowpath_ID = ID) %>%
left_join(update_id, by = c("aggregated_divide_ID" = "old_ID")) %>%
select(-aggregated_divide_ID, aggregated_divide_ID = ID) %>%

Blodgett, David L.
committed
left_join(sf::st_drop_geometry(select(out[[rpu]][[reconciled_layer]],
ID = newID, LevelPathID)),
by = c("reconciled_ID" = "ID"))
out[[rpu]][[divide_layer]] <- left_join(rename(out[[rpu]][[divide_layer]],
old_ID = ID),
update_id, by = c("old_ID"))
out[[rpu]][[divide_layer]] <- select(out[[rpu]][[divide_layer]],
ID, areasqkm, member_COMID)
out[[rpu]][[divide_layer]]$rpu <- rep(rpu, nrow(out[[rpu]][[divide_layer]]))
#### aggregate layers ####
for(l in c(agg_cats_layer, agg_fline_layer, mapped_outlets_layer)) {
out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_ID = ID),
update_id,
by = c("old_ID"))
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
if("toID" %in% names(out[[rpu]][[l]])) {
out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_toID = toID),
rename(update_id, toID = ID),
by = c("old_toID" = "old_ID"))
sets <- select(st_drop_geometry(out[[rpu]][[l]]), ID, old_ID, old_set = set) %>%
mutate(old_set = strsplit(old_set, ",")) %>%
tidyr::unnest(cols = old_set) %>%
mutate(old_set = as.integer(old_set)) %>%
left_join(rename(update_id, set = ID), by = c("old_set" = "old_ID")) %>%
select(-old_set, -old_ID)
sets <- split(sets$set, sets$ID)
sets <- data.frame(ID = as.integer(names(sets))) %>%
mutate(set = unname(sets))
out[[rpu]][[l]] <- left_join(select(out[[rpu]][[l]], -set),
sets, by = "ID")
out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID, -old_toID)
out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID", "toID", "set",
names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in%
c("ID", "toID", "set",
attr(out[[rpu]][[l]], "sf_column"))],
attr(out[[rpu]][[l]], "sf_column"))]
out[[rpu]][[l]]$set <- sapply(out[[rpu]][[l]]$set, paste, collapse = ",")
} else {
out[[rpu]][[l]] <- select(out[[rpu]][[l]], -old_ID)
out[[rpu]][[l]] <- out[[rpu]][[l]][c("ID",
names(out[[rpu]][[l]])[!names(out[[rpu]][[l]]) %in%
c("ID",
attr(out[[rpu]][[l]], "sf_column"))],
attr(out[[rpu]][[l]], "sf_column"))]
}
}
out <- setNames(lapply(names(out[[1]]), function(x, out) {
dplyr::bind_rows(lapply(out, function(df, n) {
if("COMID" %in% names(df[[n]]) && is.numeric(df[[n]]$COMID))
df[[n]]$COMID <- as.character(df[[n]]$COMID)
df[[n]]
}, n = x))
}, out = out), names(out[[1]]))
# blow up so we have unique COMIDs to join on.
# need to keep the top most of any splits (the .1 variety)
# this makes sure out toCOMID assignments go to the right new id.
long_form <- st_drop_geometry(out[[reconciled_layer]]) %>%
select(newID, member_COMID) %>%
mutate(member_COMID = strsplit(member_COMID, ",")) %>%
tidyr::unnest(cols = member_COMID) %>%
filter(grepl("\\.1$", member_COMID) | !grepl("\\.", member_COMID)) %>%
mutate(NHDPlusV2_COMID = as.integer(member_COMID)) %>%
select(-member_COMID, update_newtoID = newID)
if("toCOMID" %in% names(out[[reconciled_layer]])) {
# NOTE: if the missing tocomid is in the next VPU they will still be NA
out[[reconciled_layer]] <- left_join(out[[reconciled_layer]],
long_form, by = c("toCOMID" = "NHDPlusV2_COMID"))
out[[reconciled_layer]]$newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] <-
out[[reconciled_layer]]$update_newtoID[!is.na(out[[reconciled_layer]]$toCOMID)]
}
out[[reconciled_layer]] <- select(out[[reconciled_layer]], ID = newID,
toID = newtoID, LENGTHKM, TotDASqKM,
member_COMID, LevelPathID, refactor_ID = ID)
# FIXME: THESE FUNCTIONS SHOULD COME FROM hyFab
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
#' Extract nexus locations for Reference POIs
#' @param gpkg a reference hydrofabric gpkg
#' @param type the type of desired POIs
#' @param verbose should messages be emitted?
#' @return data.frame with ID, type columns
#' @export
nexus_from_poi = function(mapped_POIs,
type = c('HUC12', 'Gages', 'TE', 'NID', 'WBIn', 'WBOut'),
verbose = TRUE){
# Will either be inferred - or - reference... use "unique" ID (comid facepalm).
valid_types = c('HUC12', 'Gages', 'TE', 'NID', 'WBIn', 'WBOut', "Conf", "Term", "Elev", "Travel", "Con")
if(!all(type %in% valid_types)){
bad_ids = type[!which(type %in% valid_types)]
stop(bad_ids, " are not valid POI types. Only ", paste(valid_types, collapse = ", "), " are valid")
}
if(is.null(type)){
type = valid_types
}
nexus_locations = mapped_POIs %>%
st_drop_geometry() %>%
select(ID, identifier, paste0("Type_", type)) %>%
mutate_at(vars(matches("Type_")), as.character) %>%
mutate(poi_id = as.character(identifier),
identifier = NULL) %>%
group_by(ID, poi_id) %>%
ungroup() %>%
pivot_longer(-c(poi_id, ID)) %>%
filter(!is.na(value)) %>%
mutate(type = gsub("Type_", "", name),
name = NULL)
nexus_locations
}
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
#' Generate Lookup table for Aggregated Nextwork
#' @param gpkg to the aggregated network. The look up table will be added here as a layer called "lookup_table"
#' @param flowpath_name the name of the flowpath layer in gpkg
#' @param refactored_fabric the gpkg for the corresponding VPU reference fabric
#' @return data.frame
#' @export
#' @importFrom sf read_sf st_drop_geometry write_sf
#' @importFrom dplyr mutate select full_join left_join
#' @importFrom tidyr unnest
generate_lookup_table = function(nl,
mapped_POIs) {
# THIS IS THE CODE ABOVE!!!!
nexus_locations = nexus_from_poi(mapped_POIs, verbose = FALSE)
lu = nl$lookup_table %>%
select(reconciled_ID, NHDPlusV2_COMID)
names(nl$flowpaths) <- tolower(names(nl$flowpaths))
nl$flowpaths <- rename(nl$flowpaths, member_COMID = set)
lu2 = nl$flowpaths %>%
st_drop_geometry() %>%
select(
aggregated_ID = id,
toID = toid,
member_COMID = member_comid,
divide_ID = id,
POI_ID = poi_id,
mainstem_id = levelpathid
) %>%
mutate(member_COMID = strsplit(member_COMID, ","),
POI_ID = as.integer(POI_ID)) %>%
unnest(col = 'member_COMID') %>%
mutate(NHDPlusV2_COMID_part = sapply( strsplit(member_COMID, "[.]"),
FUN = function(x){ x[2] }),
NHDPlusV2_COMID_part = ifelse(is.na(NHDPlusV2_COMID_part), 1L,
as.integer(NHDPlusV2_COMID_part)),
NHDPlusV2_COMID = sapply( strsplit(member_COMID, "[.]"),
FUN = function(x){ as.numeric(x[1]) })
) %>%
full_join(lu,
by = "NHDPlusV2_COMID")
group_by(aggregated_ID) %>%
arrange(hydroseq) %>%
mutate(POI_ID = c(POI_ID[1], rep(NA, n()-1))) %>%
ungroup() %>%
select(-hydroseq, - member_COMID) %>%
left_join(select(nexus_locations, POI_ID = poi_id, value, type),
by = "POI_ID") %>%
select(NHDPlusV2_COMID, NHDPlusV2_COMID_part,
reconciled_ID,
aggregated_ID, toID, mainstem = mainstem_id,
POI_ID, POI_TYPE = type, POI_VALUE = value)
write_sf(lu2, gpkg, "lookup_table")
return(gpkg)
}