hyfabric_source <- list.files(pattern = "hyfabric_.*.tar.gz")
hyfabric_version <- substr(hyfabric_source, 10, 14)

if(!(require(hyfabric, quietly = TRUE) && packageVersion("hyfabric") == hyfabric_version)) {
  dir.create(path = Sys.getenv("R_LIBS_USER"), showWarnings = FALSE, recursive = TRUE)
  
  .libPaths(Sys.getenv("R_LIBS_USER"))
  
  install.packages(hyfabric_source, lib = Sys.getenv("R_LIBS_USER"),
                    repos = NULL, type = "source")
}

fix_headwaters <- function(nhd_rds, out_gpkg, new_atts = NULL, 
                           nhdpath = nhdplusTools::nhdplus_path(), 
                           par = 6) {
  
  if(!file.exists(out_gpkg)) {
  nhd <- readRDS(nhd_rds)
  
  # Need to replace headwater flowlines with the trimmed burn line event.
  # Some headwater flowlines are not fully within the catchment they go with.
  ble <- sf::read_sf(nhdpath, "BurnLineEvent")
  
  ble <- dplyr::left_join(dplyr::select(sf::st_drop_geometry(nhd), COMID), 
                          ble, by = "COMID")
  
  ble <- sf::st_zm(sf::st_as_sf(ble))
  nhd <- sf::st_zm(nhd)
  
  sf::st_geometry(nhd)[nhd$StartFlag == 1] <- sf::st_geometry(ble)[nhd$StartFlag == 1]
  
  if(!is.null(new_atts)) {
    new_atts <- data.table::fread(new_atts, 
                                  integer64 = "character", 
                                  showProgress = FALSE)
    
    if("weight" %in% names(new_atts)) new_atts <- rename(new_atts, 
                                                        ArbolateSu = weight)
    
    nhd <- select(nhd, COMID, VPUID, RPUID, FTYPE, FromNode, ToNode, 
                  StartFlag, StreamCalc, Divergence, WBAREACOMI, 
                  VPUID, RPUID, DnMinorHyd) %>%
      right_join(new_atts, by = c("COMID" = "comid")) %>%
      nhdplusTools::align_nhdplus_names()
  }
  
  nhd <- sf::st_sf(nhd)
  
  m_to_km <- (1/1000)
  
  nhd$LENGTHKM <- as.numeric(sf::st_length(sf::st_transform(nhd, 5070))) * m_to_km

  # is a headwater and for sure flows to something.  
  check <- !nhd$COMID %in% nhd$toCOMID & !(nhd$toCOMID == 0 | is.na(nhd$toCOMID) | !nhd$toCOMID %in% nhd$COMID)
  check_direction <- dplyr::filter(nhd, check)$COMID
  
  cl <- parallel::makeCluster(16)
  
  # check and fix these.
  new_geom <- pbapply::pblapply(check_direction, fix_flowdir, 
                                network = nhd[nhd$COMID %in% check_direction, ], cl = cl)
  
  parallel::stopCluster(cl)
  
  # One is empty. Deal with it and remove from the replacement.
  error_index <- sapply(new_geom, inherits, what = "try-error")
  errors <- filter(nhd, COMID %in% check_direction[error_index])
  check[which(nhd$COMID %in% errors$COMID)] <- FALSE
  
  if(!all(sapply(sf::st_geometry(errors), sf::st_is_empty))) {
    stop("Errors other than empty geometry from fix flowdir")
  }
  
  new_geom <- new_geom[!error_index]
  new_geom <- do.call(c, new_geom)
  st_geometry(nhd)[check] <- new_geom
  
  sf::write_sf(nhd, out_gpkg, "reference_flowlines")
  
  }
  return(out_gpkg)
}

## Is available in sbtools now, but will leave till on cran.
#' Retrieves files from SB in facet file_structure
#' @param sbitem (character) ScienceBase item ID
#' @param out_dir (directory) directory where files are downloaded
#' 
# Retrieve files from SB if files not listed under sbtools
retrieve_sb <- function(sbitem, out_dir){
  
  item <- item_get(sbitem)
  
  linked_files <- 
    lapply(item$facets, function(x) 
    {
      list(name = x$name, 
           files = lapply(x$files, function(y, out_path)
           {
             
             out_file <- file.path(out_path, y$name)
             download.file(y$downloadUri, out_file, mode = "wb")
             
             list(fname = y$name,
                  url = y$downloadUri)
             
           }, 
           out_path = file.path(data_dir, x$name)))
    })
}


#' Documents why streamgage is excluded from POIs
#' @param source (character) data source
#' @param reason (character) reason gage is excluded
#' @return writes output to file
#' 
gage_document <- function(data, source, drop_reason){
  if(!file.exists("cache/dropped_gages.csv")){
    write.csv(data %>% mutate(data_source = source, reason = drop_reason),
              "cache/dropped_gages.csv", quote = F, row.names = F)
  } else {
    write.table(data %>%
                  mutate(data_source = source, reason = drop_reason),
                file = "cache/dropped_gages.csv", append = T, sep = ",", 
                col.names = F, row.names = F, quote = F)
  }
}

#' clean network
#' @param net data.frame nhdplus network
#' @param net data.frame new nhdplus network as from e2nhd
#' @param nwm data.frame with national water model new network
#' @return data.frame fully reconciled attributes
clean_net <- function(net, net_new, nwm) {
  
  # NOTE: need to disconnected diverted paths as indicated in the tonode / fromnode attributes.
  
  diverted_heads <- filter(net_new, divergence == 2)
  
  net_new <- filter(net_new, !tocomid %in% diverted_heads$comid)
  
  # NOTE: avoid fcode == 56600 (coastal)
  avoid <- net$comid[net$fcode == 56600]
  
  # NOTE: avoid large groups where things have been redirected in an un-natural way.
  groups <- group_by(nwm, tocomid)
  
  groups <- data.frame(tocomid = summarise(groups, .groups = "drop"), size = group_size(groups))
  
  groups <- filter(groups, size > 10 & tocomid != 0)
  
  # NOTE: also avoid all flowlines that go to coastal flowlines.
  # This is due to unwanted great lakes connections in the NWM.
  avoid <- c(avoid, 
             net_new$comid[net_new$tocomid %in% avoid], 
             net_new$comid[net_new$tocomid %in% groups$tocomid],
             nwm$comid[nwm$tocomid %in% groups$tocomid])
  
  avoid <- unique(avoid)
  
  nwm <- filter(nwm, tocomid %in% c(net_new$comid, 0))
  
  net_new <- left_join(net_new, select(nwm, comid, 
                                       nwm_tocomid = tocomid), 
                       by = "comid") %>% 
    mutate(tocomid = ifelse(is.na(tocomid), 0, tocomid))
  ## NWM already uses 0 for outlets.
  
  # NOTE: just being explicit about this for clarity.
  candidate <- select(net_new, comid, tocomid, nwm_tocomid) %>%
    filter(!comid %in% avoid)
  
  mismatch <- candidate[candidate$tocomid != candidate$nwm_tocomid, ]
  
  net_new <- left_join(net_new, select(mismatch, comid, 
                                       new_tocomid = nwm_tocomid), 
                       by = "comid")
  
  to_change <- filter(net_new, !is.na(new_tocomid))
  
  net_new <- net_new %>%
    mutate(divergence = ifelse(comid %in% to_change$new_tocomid & divergence == 2, 1,
                               ifelse(comid %in% to_change$tocomid & divergence == 1, 2, 
                                      divergence)),
           tocomid = ifelse(comid %in% to_change$comid, new_tocomid, tocomid)) %>%
    select(-nwm_tocomid, -new_tocomid)
  
  net_new <- filter(net_new, fcode != 56600)
  
  net_new <- mutate(net_new, tocomid = ifelse(!tocomid %in% comid, 0, tocomid))
  
  return(net_new)
  
}

#' Merges geopackages together to create CONUs geopackage of features
#' @param feat (character) Type of feature to merge (POIs, segments) character
#' @param in_gpkg  (character) geopackage to merge (GF, ND_, etc.)
#' @param out_gpkg (character) Name of output geopackage
#' @return output geopackage of CONUS for given features
#'
Merge_VPU <- function(feat, in_gpkg, out_gpkg){

  if(needs_layer(out_gpkg, feat)) {
    all_sf <- paste0(feat, "_CONUS")
    #VPUs <- c("03N", "03S", "03W")
    #VPUs <- c("10L", "10U")
    VPUs <-c(paste0("0", c(1:2)), "03N", "03S", "03W", paste0("0", c(4:9)),
             as.character(11:18), "10U", "10L")
    featCON <- do.call("rbind", lapply(VPUs, function(x) {
      tryCatch({
        layer <- ifelse(feat %in% c("nhd_flowline", "unassigned_gages", "unassigned_TE"), 
                                    feat, paste0(feat, "_", x))
        read_sf(file.path("cache", paste0(in_gpkg, x, ".gpkg")), 
                layer)}, 
        error = function(e) stop(paste("didn't find", x,
                                       "\n Original error was", e)))
    }))
    write_sf(featCON, out_gpkg, feat)
  }
}

#' Capture all catchments (flowline, sink, etc.) in a given RPU
#' @param fcats (sf data.frame) NHDPlusv2.1 National geodatabase catchment layer
#' @param the_RPU (character) RPU to retrive full catchment set.
#' @param fl_rds character path to flowline rds file
#' @param nhd_gdb character path to NHD geodatabase
#' @return (sf data.frame) with FEATUREID, RPUID, FTYPE, TERMINAFL 
cat_rpu <- function(fcats, nhd_gdb){
  
  fl <- read_sf(nhd_gdb, "NHDFlowline_Network") %>%
    align_nhdplus_names()
  
  # read the CatchmentSP
  cats <- readRDS(fcats) %>%
    st_as_sf() %>%
    st_drop_geometry() %>%
    dplyr::select(FEATUREID, SOURCEFC)
  
  # read the Flowlines
  flowln_df <- fl %>%
    st_as_sf() %>%
    st_drop_geometry() %>%
    select(COMID, FTYPE, TerminalFl, RPUID)%>%
    rename(FEATUREID = COMID) %>%
    rename(TERMINALFL = TerminalFl)%>%
    filter(FEATUREID %in% cats$FEATUREID)
  
  # Read in sinks
  sink_df <- sf::st_read(nhd_gdb, layer = "Sink") %>%
    st_drop_geometry() %>%
    dplyr::select(SINKID, RPUID = InRPU) %>%
    dplyr::rename(FEATUREID = SINKID) %>%
    dplyr::mutate(FTYPE = "Sink", TERMINALFL = 0) %>%
    dplyr::filter(!FEATUREID %in% flowln_df$FEATUREID)
  
  #combine all the RPUIDs
  lkup_rpu <- rbind(flowln_df, sink_df)
  
  # FEATUREID 10957920, 20131674, 24534636 - this are the ids of the missing, checked on map,
  #     they look like they should not be used

  # add the records for the missing
  missrec_df <- data.frame(FEATUREID=c(10957920, 20131674, 245346360), 
                           RPUID = c("03a", "03d", "17b"),
                           TERMINALFL = c(0, 0, 0)) %>%
    mutate(FTYPE = "")
  
  lkup_rpu2 <- rbind(lkup_rpu, missrec_df)
  return(lkup_rpu2)
}

merge_refactor <- function(rpus, rpu_vpu_out, 
                           lookup_table_refactor, 
                           reconciled_layer,
                           divide_layer, 
                           agg_fline_layer,
                           agg_cats_layer,
                           mapped_outlets_layer) {
  
  
  out <- rep(list(list()), length(rpus))
  names(out) <- rpus
  
  s <- 10000000
  
  for(rpu in rpus) {

    refactor_gpkg <- file.path("cache", paste0(rpu, "_refactor.gpkg"))
    aggregate_gpkg <- file.path("cache", paste0(rpu, "_aggregate.gpkg"))
    
    out[[rpu]] <- setNames(list(sf::read_sf(aggregate_gpkg, lookup_table_refactor), 
                                 sf::read_sf(refactor_gpkg, reconciled_layer), 
                                 sf::read_sf(refactor_gpkg, divide_layer), 
                                 sf::read_sf(aggregate_gpkg, agg_fline_layer), 
                                 sf::read_sf(aggregate_gpkg, agg_cats_layer), 
                                 sf::read_sf(aggregate_gpkg, mapped_outlets_layer)), 
                            c(lookup_table_refactor, 
                              reconciled_layer, 
                              divide_layer, 
                              agg_fline_layer, 
                              agg_cats_layer, 
                              mapped_outlets_layer))
    
    out[[rpu]][[reconciled_layer]] <- select(out[[rpu]][[reconciled_layer]], 
                                              ID, toID, LENGTHKM, TotDASqKM, member_COMID)
    
    out[[rpu]][[reconciled_layer]]$newID <- 
      seq(s, by = 1, length.out = nrow(out[[rpu]][[reconciled_layer]]))
    
    s <- max(out[[rpu]][[reconciled_layer]]$newID) + 1
    
    # get a toID that matches the new IDs
    out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], select(st_drop_geometry(out[[rpu]][[reconciled_layer]]), 
                             ID, newtoID = newID), 
                   by = c("toID" = "ID"))
    
    # find updates that we need to apply
    update <- rpu_vpu_out[!rpu_vpu_out$toCOMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID &
                            rpu_vpu_out$COMID %in% out[[rpu]][[lookup_table_refactor]]$NHDPlusV2_COMID, ]
    
    # apply these updates for follow up if we got any
    if(nrow(update) > 0) {
      update <- left_join(update, out[[rpu]][[lookup_table_refactor]], by = c("COMID" = "NHDPlusV2_COMID")) %>%
        left_join(select(out[[rpu]][[lookup_table_refactor]], NHDPlusV2_COMID, to_member_COMID = member_COMID), 
                  by = c("toCOMID" = "NHDPlusV2_COMID"))
      
      out[[rpu]][[reconciled_layer]] <- left_join(out[[rpu]][[reconciled_layer]], select(update, reconciled_ID, toCOMID),
                     by = c("ID" = "reconciled_ID"))
    }
    
    update_id <- sf::st_drop_geometry(select(out[[rpu]][[reconciled_layer]], 
                                             old_ID = ID, ID = newID))
    
    out[[rpu]][[lookup_table_refactor]] <- left_join(out[[rpu]][[lookup_table_refactor]], 
                                                     update_id, 
                                                     by = c("reconciled_ID" = "old_ID"))
    
    out[[rpu]][[divide_layer]] <- left_join(rename(out[[rpu]][[divide_layer]], old_ID = ID),
                                            update_id, 
                                            by = c("old_ID"))
    
    out[[rpu]][[divide_layer]] <- select(out[[rpu]][[divide_layer]], ID, areasqkm, member_COMID)
    
    for(l in c(agg_cats_layer, agg_fline_layer, mapped_outlets_layer)) {
      
      out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_ID = ID),
                                   update_id, 
                                   by = c("old_ID"))
      
      
      out[[rpu]][[l]] <- left_join(rename(out[[rpu]][[l]], old_toID = toID),
                                   rename(update_id, toID = ID), 
                                   by = c("old_toID" = "old_ID"))
      
     sets <- select(st_drop_geometry(out[[rpu]][[l]]), ID, old_ID, old_set = set) %>%
       mutate(old_set = strsplit(old_set, ",")) %>%
       tidyr::unnest(cols = old_set) %>%
       mutate(old_set = as.integer(old_set)) %>%
       left_join(rename(update_id, set = ID), by  = c("old_set" = "old_ID")) %>%
       select(-old_set, -old_ID)
       
     sets <- split(sets$set, sets$ID)
     
     sets <- data.frame(ID = as.integer(names(sets))) %>%
       mutate(set = unname(sets))
     
     out[[rpu]][[l]] <- left_join(select(out[[rpu]][[l]], -set),
                                  sets, by = "ID")
     
     out[[rpu]][[l]] <- select(out[[rpu]][[l]], ID, toID, set)
     
     out[[rpu]][[l]]$set <- sapply(out[[rpu]][[l]]$set, paste, collapse = ",")
    }
    
  }
  
  out <- setNames(lapply(names(out[[1]]), function(x, out) {
    dplyr::bind_rows(lapply(out, function(df, n) df[[n]], n = x))
  }, out = out), names(out[[1]]))
  
  # blow up so we have unique COMIDs to join on.
  long_form <- st_drop_geometry(out[[reconciled_layer]]) %>%
    select(newID, member_COMID) %>%
    mutate(member_COMID = strsplit(member_COMID, ",")) %>%
    tidyr::unnest(cols = member_COMID) %>%
    mutate(NHDPlusV2_COMID = as.integer(member_COMID)) %>%
    select(-member_COMID, update_newtoID = newID)

  # NOTE: if the missing tocomid is in the next VPU they will still be NA
  out[[reconciled_layer]] <- left_join(out[[reconciled_layer]], 
                                       long_form, by = c("toCOMID" = "NHDPlusV2_COMID"))
  out[[reconciled_layer]]$newtoID[!is.na(out[[reconciled_layer]]$toCOMID)] <- 
    out[[reconciled_layer]]$update_newtoID[!is.na(out[[reconciled_layer]]$toCOMID)]

  out[[reconciled_layer]] <- select(out[[reconciled_layer]], ID = newID, toID = newtoID, LENGTHKM, TotDASqKM, member_COMID, refactor_ID = ID)
  
  out
}