diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 1a52d7577..3accbf838 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -396,7 +396,7 @@ impl StoreDriver for FastSlowStore { return self.slow_store.update(key, reader, size_info).await; } - let (mut fast_tx, fast_rx) = make_buf_channel_pair(); + let (fast_tx, fast_rx) = make_buf_channel_pair(); let (mut slow_tx, slow_rx) = make_buf_channel_pair(); let key_debug = format!("{key:?}"); @@ -408,6 +408,7 @@ impl StoreDriver for FastSlowStore { let mut bytes_sent: u64 = 0; let data_stream_fut = async move { + let mut fast_tx = Some(fast_tx); loop { let buffer = reader .recv() @@ -415,9 +416,9 @@ impl StoreDriver for FastSlowStore { .err_tip(|| "Failed to read buffer in fastslow store")?; if buffer.is_empty() { // EOF received. - fast_tx.send_eof().err_tip( - || "Failed to write eof to fast store in fast_slow store update", - )?; + if let Some(mut ftx) = fast_tx.take() { + drop(ftx.send_eof()); + } slow_tx .send_eof() .err_tip(|| "Failed to write eof to writer in fast_slow store update")?; @@ -429,34 +430,43 @@ impl StoreDriver for FastSlowStore { } let chunk_len = buffer.len(); - let send_start = std::time::Instant::now(); - let (fast_result, slow_result) = - join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); - let send_elapsed = send_start.elapsed(); - if send_elapsed.as_secs() >= 5 { - warn!( - chunk_len, - send_elapsed_ms = send_elapsed.as_millis(), - total_bytes = bytes_sent, - "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", - ); - } - bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); - fast_result - .map_err(|e| { + if let Some(ref mut ftx) = fast_tx { + let send_start = std::time::Instant::now(); + let (fast_result, slow_result) = + join!(ftx.send(buffer.clone()), slow_tx.send(buffer)); + let send_elapsed = send_start.elapsed(); + if send_elapsed.as_secs() >= 5 { + warn!( + chunk_len, + send_elapsed_ms = send_elapsed.as_millis(), + total_bytes = bytes_sent, + "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", + ); + } + if fast_result.is_err() { + warn!( + total_bytes = bytes_sent, + "FastSlowStore::update: fast store channel failed, continuing with slow store only", + ); + fast_tx = None; + } + slow_result.map_err(|e| { make_err!( Code::Internal, - "Failed to send message to fast_store in fast_slow_store {:?}", + "Failed to send message to slow_store in fast_slow store {:?}", e ) - }) - .merge(slow_result.map_err(|e| { + })?; + } else { + slow_tx.send(buffer).await.map_err(|e| { make_err!( Code::Internal, "Failed to send message to slow_store in fast_slow store {:?}", e ) - }))?; + })?; + } + bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); } }; @@ -483,7 +493,15 @@ impl StoreDriver for FastSlowStore { "FastSlowStore::update: completed successfully", ); } - data_stream_res.merge(fast_res).merge(slow_res)?; + // Slow store success is required; fast store failure is tolerated since it's a cache. + data_stream_res.merge(slow_res)?; + if let Err(err) = fast_res { + warn!( + ?err, + key = %key_debug, + "FastSlowStore::update: fast store failed during upload; data stored in slow store", + ); + } Ok(()) } @@ -538,10 +556,20 @@ impl StoreDriver for FastSlowStore { { return Ok(Some(file)); } - return self + return match self .fast_store .update_with_whole_file(key, path, file, upload_size) - .await; + .await + { + Ok(file_slot) => Ok(file_slot), + Err(err) => { + warn!( + ?err, + "FastSlowStore::update_with_whole_file: fast store failed; data stored in slow store", + ); + Ok(None) + } + }; } if self @@ -555,14 +583,19 @@ impl StoreDriver for FastSlowStore { || self.fast_direction == StoreDirection::ReadOnly || self.fast_direction == StoreDirection::Get; if !ignore_fast { - slow_update_store_with_file( + if let Err(err) = slow_update_store_with_file( self.fast_store.as_store_driver_pin(), key.borrow(), &mut file, upload_size, ) .await - .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; + { + warn!( + ?err, + "FastSlowStore::update_with_whole_file: fast store failed; continuing with slow store", + ); + } } let ignore_slow = self.slow_direction == StoreDirection::ReadOnly || self.slow_direction == StoreDirection::Get; diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 8ee6d9c0f..60e2010d5 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -816,7 +816,8 @@ impl FilesystemStore { // it still exists in there. But first, get the lock... let mut encoded_file_path = entry.get_encoded_file_path().write().await; // Then check it's still in there... - if evicting_map.get(&key).await.is_none() { + // Use size_for_key instead of get() to avoid triggering bulk eviction of other entries. + if evicting_map.size_for_key(&key).await.is_none() { info!(%key, "Got eviction while emplacing, dropping"); return Ok(()); } diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index e779f38b6..5726e9370 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -435,41 +435,49 @@ where } pub async fn get(&self, key: &Q) -> Option { - // Fast path: Check if we need eviction before acquiring lock for eviction - let needs_eviction = { - let state = self.state.lock(); - if let Some((_, peek_entry)) = state.lru.peek_lru() { - self.should_evict( - state.lru.len(), - peek_entry, - state.sum_store_size, - self.max_bytes, - ) + let (result, items_to_unref, removal_futures) = { + let mut state = self.state.lock(); + // Check if the requested item is expired before promoting it. + if let Some(entry) = state.lru.peek(key.borrow()) { + if self.should_evict(state.lru.len(), entry, 0, u64::MAX) { + // Item is expired, remove it. + if let Some((k, eviction_item)) = state.lru.pop_entry(key.borrow()) { + let (data, futures) = state.remove(k.borrow(), &eviction_item, false); + let (mut items, mut removals) = self.evict_items(&mut *state); + items.push(data); + removals.extend(futures); + (None, items, removals) + } else { + let (items, removals) = self.evict_items(&mut *state); + (None, items, removals) + } + } else { + // Item is valid. Promote it in LRU so it's safe from eviction. + let data = state.lru.get_mut(key.borrow()).map(|entry| { + entry.seconds_since_anchor = + i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); + entry.data.clone() + }); + let (items, removals) = self.evict_items(&mut *state); + (data, items, removals) + } } else { - false + let (items, removals) = self.evict_items(&mut *state); + (None, items, removals) } }; + // Unref items outside of lock — lock is guaranteed dropped here. + Self::unref_items(items_to_unref, removal_futures).await; + result + } - // Perform eviction if needed - if needs_eviction { - let (items_to_unref, removal_futures) = { - let mut state = self.state.lock(); - self.evict_items(&mut *state) - }; - // Unref items outside of lock - let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); - while callbacks.next().await.is_some() {} - let mut callbacks: FuturesUnordered<_> = - items_to_unref.iter().map(LenEntry::unref).collect(); - while callbacks.next().await.is_some() {} - } - - // Now get the item - let mut state = self.state.lock(); - let entry = state.lru.get_mut(key.borrow())?; - entry.seconds_since_anchor = - i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); - Some(entry.data.clone()) + /// Helper to unref evicted items outside of lock. + async fn unref_items(items_to_unref: Vec, removal_futures: Vec) { + let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); + while callbacks.next().await.is_some() {} + let mut callbacks: FuturesUnordered<_> = + items_to_unref.iter().map(LenEntry::unref).collect(); + while callbacks.next().await.is_some() {} } /// Returns the replaced item if any. diff --git a/nativelink-util/src/instant_wrapper.rs b/nativelink-util/src/instant_wrapper.rs index 81247ec13..2b882d0b1 100644 --- a/nativelink-util/src/instant_wrapper.rs +++ b/nativelink-util/src/instant_wrapper.rs @@ -88,7 +88,7 @@ impl InstantWrapper for MockInstantWrapped { let baseline = self.0.elapsed(); loop { tokio::task::yield_now().await; - if self.0.elapsed() - baseline >= duration { + if self.0.elapsed().saturating_sub(baseline) >= duration { break; } } diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 0b9ff40e2..ccf53a3a4 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -369,11 +369,34 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke .err_tip(|| "Error while calling execution_response")?; }, Err(e) => { - grpc_client.execution_response(ExecuteResult{ - instance_name, - operation_id, - result: Some(execute_result::Result::InternalError(e.into())), - }).await.err_tip(|| "Error calling execution_response with error")?; + let is_cas_blob_missing = e.code == Code::NotFound + && e.message_string().contains("not found in either fast or slow store"); + if is_cas_blob_missing { + warn!( + ?e, + "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION" + ); + let action_result = ActionResult { + error: Some(make_err!( + Code::FailedPrecondition, + "{}", + e.message_string() + )), + ..ActionResult::default() + }; + let action_stage = ActionStage::Completed(action_result); + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::ExecuteResponse(action_stage.into())), + }).await.err_tip(|| "Error calling execution_response with missing inputs")?; + } else { + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::InternalError(e.into())), + }).await.err_tip(|| "Error calling execution_response with error")?; + } }, } Ok(())