diff --git a/workspace/R/utils.R b/workspace/R/utils.R
index c3157750dff1f6b219c3bd89e78b1e076dfe0188..84f5800f81e6e6afed12533d5747d81636825dea 100644
--- a/workspace/R/utils.R
+++ b/workspace/R/utils.R
@@ -40,7 +40,7 @@ fix_headwaters <- function(nhd_rds, out_gpkg, new_atts = NULL,
     
     nhd <- select(nhd, COMID, FromNode, ToNode, 
                   StartFlag, StreamCalc, Divergence, DnMinorHyd) %>%
-      right_join(new_atts, by = c("COMID" = "comid")) %>%
+      left_join(new_atts, by = c("COMID" = "comid")) %>%
       nhdplusTools::align_nhdplus_names()
   }
   
@@ -49,22 +49,52 @@ fix_headwaters <- function(nhd_rds, out_gpkg, new_atts = NULL,
   m_to_km <- (1/1000)
   
   nhd$LENGTHKM <- as.numeric(sf::st_length(sf::st_transform(nhd, 5070))) * m_to_km
+  
+  custom_net <- select(sf::st_drop_geometry(nhd), 
+                       COMID, FromNode, ToNode, Divergence)
+  
+  custom_net <- nhdplusTools::get_tocomid(custom_net, remove_coastal = FALSE)
+  
+  custom_net <- select(custom_net, comid, override_tocomid = tocomid)
 
+  nhd <- left_join(nhd, custom_net, by = c("COMID" = "comid"))
+  
+  nhd <- mutate(nhd, override_tocomid = ifelse(toCOMID == 0, override_tocomid, toCOMID))
+  
   # is a headwater and for sure flows to something.  
-  check <- !nhd$COMID %in% nhd$toCOMID & !(nhd$toCOMID == 0 | is.na(nhd$toCOMID) | !nhd$toCOMID %in% nhd$COMID)
-  check_direction <- dplyr::filter(nhd, check)$COMID
+  check <- !nhd$COMID %in% nhd$override_tocomid & 
+    !(nhd$override_tocomid == 0 | is.na(nhd$override_tocomid) | 
+        !nhd$override_tocomid %in% nhd$COMID)
+  check_direction <- dplyr::filter(nhd, check)
   
-  cl <- parallel::makeCluster(16)
+  if(!all(check_direction$override_tocomid[check_direction$override_tocomid != 0] %in% nhd$COMID))
+    stop("this won't work")
+  
+  matcher <- match(check_direction$override_tocomid, nhd$COMID)
   
+  matcher <- nhd[matcher, ]
+  
+  fn_list <- Map(list, 
+                 split(check_direction, seq_len(nrow(check_direction))), 
+                 split(matcher, seq_len(nrow(matcher))))
+  
+  cl <- parallel::makeCluster(16)
+    
   # check and fix these.
-  new_geom <- pbapply::pblapply(check_direction, fix_flowdir, 
-                                network = nhd[nhd$COMID %in% check_direction, ], cl = cl)
+  new_geom <- pbapply::pblapply(cl = cl, 
+                                 X = fn_list, 
+                                 FUN = function(x) {
+                                   nhdplusTools::fix_flowdir(x[[1]]$COMID, 
+                                               fn_list = list(flowline = x[[1]], 
+                                                              network = x[[2]], 
+                                                              check_end = "end"))
+                                 })
   
   parallel::stopCluster(cl)
   
   # One is empty. Deal with it and remove from the replacement.
   error_index <- sapply(new_geom, inherits, what = "try-error")
-  errors <- filter(nhd, COMID %in% check_direction[error_index])
+  errors <- filter(nhd, COMID %in% check_direction$COMID[error_index])
   check[which(nhd$COMID %in% errors$COMID)] <- FALSE
   
   if(!all(sapply(sf::st_geometry(errors), sf::st_is_empty))) {
@@ -75,6 +105,10 @@ fix_headwaters <- function(nhd_rds, out_gpkg, new_atts = NULL,
   new_geom <- do.call(c, new_geom)
   st_geometry(nhd)[check] <- new_geom
   
+  nhd <- dplyr::filter(nhd, COMID %in% new_atts$comid)
+  
+  nhd <- select(nhd, -override_tocomid)
+  
   sf::write_sf(nhd, out_gpkg, "reference_flowlines")
   
   }