Newer
Older
library(targets)
tar_option_set(packages = c("hyRefactor", "dplyr", "sf"),
memory = "transient", garbage_collection = TRUE, error = "null",
storage = "worker", retrieval = "worker", deployment = "main")
library(future)
library(future.callr)
plan(multisession)
source("R/config.R")
source("R/user_vars.R")
source("R/03_refactor_functions.R")
prep_path <- ".targets_stores/01_prep/objects/"
list(tar_target(data_paths_file, "cache/data_paths.json", format = "file"),
tar_target(data_paths, jsonlite::read_json(data_paths_file)),
tar_target(rpu_codes_file, file.path(prep_path, "rpu_codes"), format = "file"),
tar_target(rpu_codes, readRDS(rpu_codes_file)),
tar_target(rpu_vpu_file, file.path(prep_path, "rpu_vpu"), format = "file"),
tar_target(rpu_vpu, readRDS(rpu_vpu_file)),
tar_target(rpu_gpkg, file.path("cache", paste0(rpu_codes, "_refactor.gpkg")),
pattern = map(rpu_codes), deployment = "worker"),
tar_target(vpu_gpkg, file.path("cache", paste0("reference_", rpu_vpu$vpuid[rpu_vpu$rpuid == rpu_codes], ".gpkg")),
pattern = map(rpu_codes), deployment = "worker"),
tar_target(flowline, sf::st_zm(sf::read_sf(rpu_gpkg, nhd_flowline)),
pattern = map(rpu_gpkg), deployment = "worker"),
tar_target(pois, sf::read_sf(vpu_gpkg, poi_geometry_table),
pattern = map(vpu_gpkg), deployment = "worker"),
tar_target(pois_data, sf::read_sf(vpu_gpkg, poi_data_table),
pattern = map(vpu_gpkg), deployment = "worker"),
tar_target(refactor_event_table, sf::read_sf(vpu_gpkg, event_geometry_table),
pattern = map(vpu_gpkg), deployment = "worker"),
tar_target(refactor_input, create_refactor_input(flowline, refactor_event_table, pois, pois_data),
pattern = map(refactor_event_table, flowline, pois, pois_data), deployment = "worker"),
tar_target(refactored_network, run_refactor_nhdplus(flowline, split_meters,
para_split_flines, combine_meters,
refactor_input$events_refactor,
refactor_input$exclude,
rpu_codes, proj_crs),
pattern = map(flowline, refactor_input, rpu_codes),
deployment = "worker"),
tar_target(refactor_lookup, create_refactor_lookup(refactored_network, flowline,
refactor_input$events_refactor,
refactor_input$outlets),
pattern = map(refactored_network, flowline, refactor_input), deployment = "worker"),
tar_target(out_refactor, {
sf::write_sf(refactored_network$refactored, rpu_gpkg, refactored_layer)
sf::write_sf(refactored_network$reconciled, rpu_gpkg, reconciled_layer)
sf::write_sf(refactor_lookup$outlets, rpu_gpkg, outlets_layer)
sf::write_sf(refactor_lookup$lookup, rpu_gpkg, lookup_table_refactor)
try(sf::write_sf(refactor_lookup$dups, rpu_gpkg, dup_pois))
}, pattern = map(refactored_network, refactor_lookup, rpu_gpkg), deployment = "worker"),
tar_target(fdr_fac, list(fdr = data_paths$fdr[[paste0("rpu_", rpu_codes)]],
fac = data_paths$fac[[paste0("rpu_", rpu_codes)]]),
pattern = rpu_codes, iteration = "list"),
tar_target(raster_crs, terra::crs(terra::rast(fdr_fac[[1]]$fdr))),
tar_target(cats, sf::st_transform(sf::read_sf(rpu_gpkg, nhd_catchment), raster_crs),
pattern = map(rpu_gpkg), deployment = "worker"),
tar_target(cache_split, file.path("cache", paste0(rpu_codes, ".rda")),
pattern = map(rpu_codes)),
tar_target(divides, reconcile_divides(cats,
refactored_network,
fdr_fac,
para_reconcile,
cache_split),
pattern = map(cats, refactored_network, fdr_fac, cache_split),
deployment = "worker"),
tar_target(out_divides, {
write_sf(divides$divides, rpu_gpkg, divide_layer)
write_sf(divides$split_cats, rpu_gpkg, split_divide_layer)
},
pattern = map(divides, rpu_gpkg), deployment = "worker")