From a5ee8fc805382f0385a14c33c238687333e5dcfb Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 8 Sep 2023 12:35:15 -0400 Subject: [PATCH 1/5] initial outline for rate limiting status updates --- crates/search/src/project_search.rs | 16 +++- crates/semantic_index/src/embedding.rs | 75 +++++++++++++++++++ crates/semantic_index/src/semantic_index.rs | 17 +++-- .../src/semantic_index_tests.rs | 6 +- 4 files changed, 106 insertions(+), 8 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index c52be64141..977ead8c9e 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -34,6 +34,7 @@ use std::{ ops::{Not, Range}, path::PathBuf, sync::Arc, + time::Duration, }; use util::ResultExt as _; use workspace::{ @@ -319,11 +320,22 @@ impl View for ProjectSearchView { let status = semantic.index_status; match status { SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()), - SemanticIndexStatus::Indexing { remaining_files } => { + SemanticIndexStatus::Indexing { + remaining_files, + rate_limiting, + } => { if remaining_files == 0 { Some(format!("Indexing...")) } else { - Some(format!("Remaining files to index: {}", remaining_files)) + if rate_limiting > Duration::ZERO { + Some(format!( + "Remaining files to index (rate limit resets in {}s): {}", + rate_limiting.as_secs(), + remaining_files + )) + } else { + Some(format!("Remaining files to index: {}", remaining_files)) + } } } SemanticIndexStatus::NotIndexed => None, diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 7228738525..6affac2556 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -7,7 +7,9 @@ use isahc::http::StatusCode; use isahc::prelude::Configurable; use isahc::{AsyncBody, Response}; use lazy_static::lazy_static; +use parking_lot::Mutex; use parse_duration::parse; +use postage::watch; use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef}; use rusqlite::ToSql; use serde::{Deserialize, Serialize}; @@ -82,6 +84,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, + rate_limit_count_rx: watch::Receiver<(Duration, usize)>, + rate_limit_count_tx: Arc>>, } #[derive(Serialize)] @@ -114,12 +118,16 @@ pub trait EmbeddingProvider: Sync + Send { async fn embed_batch(&self, spans: Vec) -> Result>; fn max_tokens_per_batch(&self) -> usize; fn truncate(&self, span: &str) -> (String, usize); + fn rate_limit_expiration(&self) -> Duration; } pub struct DummyEmbeddings {} #[async_trait] impl EmbeddingProvider for DummyEmbeddings { + fn rate_limit_expiration(&self) -> Duration { + Duration::ZERO + } async fn embed_batch(&self, spans: Vec) -> Result> { // 1024 is the OpenAI Embeddings size for ada models. // the model we will likely be starting with. @@ -149,6 +157,53 @@ impl EmbeddingProvider for DummyEmbeddings { const OPENAI_INPUT_LIMIT: usize = 8190; impl OpenAIEmbeddings { + pub fn new(client: Arc, executor: Arc) -> Self { + let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((Duration::ZERO, 0)); + let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); + + OpenAIEmbeddings { + client, + executor, + rate_limit_count_rx, + rate_limit_count_tx, + } + } + + fn resolve_rate_limit(&self) { + let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); + let updated_count = delay_count - 1; + let updated_duration = if updated_count == 0 { + Duration::ZERO + } else { + current_delay + }; + + log::trace!( + "resolving rate limit: Count: {:?} Duration: {:?}", + updated_count, + updated_duration + ); + + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + } + + fn update_rate_limit(&self, delay_duration: Duration, count_increase: usize) { + let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); + let updated_count = delay_count + count_increase; + let updated_duration = if current_delay < delay_duration { + delay_duration + } else { + current_delay + }; + + log::trace!( + "updating rate limit: Count: {:?} Duration: {:?}", + updated_count, + updated_duration + ); + + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + } async fn send_request( &self, api_key: &str, @@ -179,6 +234,10 @@ impl EmbeddingProvider for OpenAIEmbeddings { 50000 } + fn rate_limit_expiration(&self) -> Duration { + let (duration, _) = *self.rate_limit_count_rx.borrow(); + duration + } fn truncate(&self, span: &str) -> (String, usize) { let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); let output = if tokens.len() > OPENAI_INPUT_LIMIT { @@ -203,6 +262,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { .ok_or_else(|| anyhow!("no api key"))?; let mut request_number = 0; + let mut rate_limiting = false; let mut request_timeout: u64 = 15; let mut response: Response; while request_number < MAX_RETRIES { @@ -229,6 +289,12 @@ impl EmbeddingProvider for OpenAIEmbeddings { response.usage.total_tokens ); + // If we complete a request successfully that was previously rate_limited + // resolve the rate limit + if rate_limiting { + self.resolve_rate_limit() + } + return Ok(response .data .into_iter() @@ -254,6 +320,15 @@ impl EmbeddingProvider for OpenAIEmbeddings { } }; + // If we've previously rate limited, increment the duration but not the count + if rate_limiting { + self.update_rate_limit(delay_duration, 0); + } else { + self.update_rate_limit(delay_duration, 1); + } + + rate_limiting = true; + log::trace!( "openai rate limiting: waiting {:?} until lifted", &delay_duration diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 0e18c42049..8fba7de0f0 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -91,10 +91,7 @@ pub fn init( let semantic_index = SemanticIndex::new( fs, db_file_path, - Arc::new(OpenAIEmbeddings { - client: http_client, - executor: cx.background(), - }), + Arc::new(OpenAIEmbeddings::new(http_client, cx.background())), language_registry, cx.clone(), ) @@ -113,7 +110,10 @@ pub fn init( pub enum SemanticIndexStatus { NotIndexed, Indexed, - Indexing { remaining_files: usize }, + Indexing { + remaining_files: usize, + rate_limiting: Duration, + }, } pub struct SemanticIndex { @@ -132,6 +132,8 @@ struct ProjectState { pending_file_count_rx: watch::Receiver, pending_file_count_tx: Arc>>, pending_index: usize, + rate_limiting_count_rx: watch::Receiver, + rate_limiting_count_tx: Arc>>, _subscription: gpui::Subscription, _observe_pending_file_count: Task<()>, } @@ -223,11 +225,15 @@ impl ProjectState { fn new(subscription: gpui::Subscription, cx: &mut ModelContext) -> Self { let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0); let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx)); + let (rate_limiting_count_tx, rate_limiting_count_rx) = watch::channel_with(0); + let rate_limiting_count_tx = Arc::new(Mutex::new(rate_limiting_count_tx)); Self { worktrees: Default::default(), pending_file_count_rx: pending_file_count_rx.clone(), pending_file_count_tx, pending_index: 0, + rate_limiting_count_rx: rate_limiting_count_rx.clone(), + rate_limiting_count_tx, _subscription: subscription, _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); @@ -293,6 +299,7 @@ impl SemanticIndex { } else { SemanticIndexStatus::Indexing { remaining_files: project_state.pending_file_count_rx.borrow().clone(), + rate_limiting: self.embedding_provider.rate_limit_expiration(), } } } else { diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index ffd8db8781..09c94b9a94 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -21,7 +21,7 @@ use std::{ atomic::{self, AtomicUsize}, Arc, }, - time::SystemTime, + time::{Duration, SystemTime}, }; use unindent::Unindent; use util::RandomCharIter; @@ -1275,6 +1275,10 @@ impl EmbeddingProvider for FakeEmbeddingProvider { 200 } + fn rate_limit_expiration(&self) -> Duration { + Duration::ZERO + } + async fn embed_batch(&self, spans: Vec) -> Result> { self.embedding_count .fetch_add(spans.len(), atomic::Ordering::SeqCst); From bf43f93197e9149ef7625ed7b61f368edca79e82 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 8 Sep 2023 15:04:50 -0400 Subject: [PATCH 2/5] updated semantic_index reset status to leverage target reset system time as opposed to duration --- crates/search/src/project_search.rs | 22 +++--- crates/semantic_index/src/embedding.rs | 67 +++++++++---------- crates/semantic_index/src/semantic_index.rs | 10 +-- .../src/semantic_index_tests.rs | 6 +- 4 files changed, 50 insertions(+), 55 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 977ead8c9e..6b5ebd56d4 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -34,7 +34,7 @@ use std::{ ops::{Not, Range}, path::PathBuf, sync::Arc, - time::Duration, + time::SystemTime, }; use util::ResultExt as _; use workspace::{ @@ -322,17 +322,23 @@ impl View for ProjectSearchView { SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()), SemanticIndexStatus::Indexing { remaining_files, - rate_limiting, + rate_limit_expiration_time, } => { if remaining_files == 0 { Some(format!("Indexing...")) } else { - if rate_limiting > Duration::ZERO { - Some(format!( - "Remaining files to index (rate limit resets in {}s): {}", - rate_limiting.as_secs(), - remaining_files - )) + if let Some(rate_limit_expiration_time) = rate_limit_expiration_time { + if let Ok(remaining_seconds) = + rate_limit_expiration_time.duration_since(SystemTime::now()) + { + Some(format!( + "Remaining files to index(rate limit resets in {}s): {}", + remaining_seconds.as_secs(), + remaining_files + )) + } else { + Some(format!("Remaining files to index: {}", remaining_files)) + } } else { Some(format!("Remaining files to index: {}", remaining_files)) } diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 6affac2556..148b354794 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -14,8 +14,9 @@ use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef}; use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use std::env; +use std::ops::Add; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tiktoken_rs::{cl100k_base, CoreBPE}; use util::http::{HttpClient, Request}; @@ -84,8 +85,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, - rate_limit_count_rx: watch::Receiver<(Duration, usize)>, - rate_limit_count_tx: Arc>>, + rate_limit_count_rx: watch::Receiver<(Option, usize)>, + rate_limit_count_tx: Arc, usize)>>>, } #[derive(Serialize)] @@ -118,15 +119,15 @@ pub trait EmbeddingProvider: Sync + Send { async fn embed_batch(&self, spans: Vec) -> Result>; fn max_tokens_per_batch(&self) -> usize; fn truncate(&self, span: &str) -> (String, usize); - fn rate_limit_expiration(&self) -> Duration; + fn rate_limit_expiration(&self) -> Option; } pub struct DummyEmbeddings {} #[async_trait] impl EmbeddingProvider for DummyEmbeddings { - fn rate_limit_expiration(&self) -> Duration { - Duration::ZERO + fn rate_limit_expiration(&self) -> Option { + None } async fn embed_batch(&self, spans: Vec) -> Result> { // 1024 is the OpenAI Embeddings size for ada models. @@ -158,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190; impl OpenAIEmbeddings { pub fn new(client: Arc, executor: Arc) -> Self { - let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((Duration::ZERO, 0)); + let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0)); let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); OpenAIEmbeddings { @@ -170,39 +171,32 @@ impl OpenAIEmbeddings { } fn resolve_rate_limit(&self) { - let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); + let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow(); let updated_count = delay_count - 1; - let updated_duration = if updated_count == 0 { - Duration::ZERO - } else { - current_delay - }; + let updated_time = if updated_count == 0 { None } else { reset_time }; - log::trace!( - "resolving rate limit: Count: {:?} Duration: {:?}", - updated_count, - updated_duration - ); + log::trace!("resolving rate limit: Count: {:?}", updated_count); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); } - fn update_rate_limit(&self, delay_duration: Duration, count_increase: usize) { - let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = delay_count + count_increase; - let updated_duration = if current_delay < delay_duration { - delay_duration + fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) { + let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow(); + let updated_count = original_count + count_increase; + + let updated_time = if let Some(original_time) = original_time { + if reset_time < original_time { + Some(reset_time) + } else { + Some(original_time) + } } else { - current_delay + Some(reset_time) }; - log::trace!( - "updating rate limit: Count: {:?} Duration: {:?}", - updated_count, - updated_duration - ); + log::trace!("updating rate limit: Count: {:?}", updated_count); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); } async fn send_request( &self, @@ -234,9 +228,9 @@ impl EmbeddingProvider for OpenAIEmbeddings { 50000 } - fn rate_limit_expiration(&self) -> Duration { - let (duration, _) = *self.rate_limit_count_rx.borrow(); - duration + fn rate_limit_expiration(&self) -> Option { + let (expiration_time, _) = *self.rate_limit_count_rx.borrow(); + expiration_time } fn truncate(&self, span: &str) -> (String, usize) { let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); @@ -321,10 +315,11 @@ impl EmbeddingProvider for OpenAIEmbeddings { }; // If we've previously rate limited, increment the duration but not the count + let reset_time = SystemTime::now().add(delay_duration); if rate_limiting { - self.update_rate_limit(delay_duration, 0); + self.update_rate_limit(reset_time, 0); } else { - self.update_rate_limit(delay_duration, 1); + self.update_rate_limit(reset_time, 1); } rate_limiting = true; diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8fba7de0f0..b60d697b43 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -112,7 +112,7 @@ pub enum SemanticIndexStatus { Indexed, Indexing { remaining_files: usize, - rate_limiting: Duration, + rate_limit_expiration_time: Option, }, } @@ -132,8 +132,6 @@ struct ProjectState { pending_file_count_rx: watch::Receiver, pending_file_count_tx: Arc>>, pending_index: usize, - rate_limiting_count_rx: watch::Receiver, - rate_limiting_count_tx: Arc>>, _subscription: gpui::Subscription, _observe_pending_file_count: Task<()>, } @@ -225,15 +223,11 @@ impl ProjectState { fn new(subscription: gpui::Subscription, cx: &mut ModelContext) -> Self { let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0); let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx)); - let (rate_limiting_count_tx, rate_limiting_count_rx) = watch::channel_with(0); - let rate_limiting_count_tx = Arc::new(Mutex::new(rate_limiting_count_tx)); Self { worktrees: Default::default(), pending_file_count_rx: pending_file_count_rx.clone(), pending_file_count_tx, pending_index: 0, - rate_limiting_count_rx: rate_limiting_count_rx.clone(), - rate_limiting_count_tx, _subscription: subscription, _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); @@ -299,7 +293,7 @@ impl SemanticIndex { } else { SemanticIndexStatus::Indexing { remaining_files: project_state.pending_file_count_rx.borrow().clone(), - rate_limiting: self.embedding_provider.rate_limit_expiration(), + rate_limit_expiration_time: self.embedding_provider.rate_limit_expiration(), } } } else { diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 09c94b9a94..4bc95bec62 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -21,7 +21,7 @@ use std::{ atomic::{self, AtomicUsize}, Arc, }, - time::{Duration, SystemTime}, + time::SystemTime, }; use unindent::Unindent; use util::RandomCharIter; @@ -1275,8 +1275,8 @@ impl EmbeddingProvider for FakeEmbeddingProvider { 200 } - fn rate_limit_expiration(&self) -> Duration { - Duration::ZERO + fn rate_limit_expiration(&self) -> Option { + None } async fn embed_batch(&self, spans: Vec) -> Result> { From 37915ec4f2f5d03eedc5accd979c37f4c3da0121 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 8 Sep 2023 16:53:16 -0400 Subject: [PATCH 3/5] updated notify to accomodate for updated countdown --- crates/search/src/project_search.rs | 2 +- crates/semantic_index/src/embedding.rs | 42 ++++++++++----------- crates/semantic_index/src/semantic_index.rs | 17 +++++++-- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 6b5ebd56d4..5a1d5992a6 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -332,7 +332,7 @@ impl View for ProjectSearchView { rate_limit_expiration_time.duration_since(SystemTime::now()) { Some(format!( - "Remaining files to index(rate limit resets in {}s): {}", + "Remaining files to index (rate limit resets in {}s): {}", remaining_seconds.as_secs(), remaining_files )) diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 148b354794..7bac809c97 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -85,8 +85,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, - rate_limit_count_rx: watch::Receiver<(Option, usize)>, - rate_limit_count_tx: Arc, usize)>>>, + rate_limit_count_rx: watch::Receiver>, + rate_limit_count_tx: Arc>>>, } #[derive(Serialize)] @@ -159,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190; impl OpenAIEmbeddings { pub fn new(client: Arc, executor: Arc) -> Self { - let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0)); + let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with(None); let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); OpenAIEmbeddings { @@ -171,18 +171,22 @@ impl OpenAIEmbeddings { } fn resolve_rate_limit(&self) { - let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = delay_count - 1; - let updated_time = if updated_count == 0 { None } else { reset_time }; + let reset_time = *self.rate_limit_count_tx.lock().borrow(); - log::trace!("resolving rate limit: Count: {:?}", updated_count); + if let Some(reset_time) = reset_time { + if SystemTime::now() >= reset_time { + *self.rate_limit_count_tx.lock().borrow_mut() = None + } + } - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); + log::trace!( + "resolving reset time: {:?}", + *self.rate_limit_count_tx.lock().borrow() + ); } - fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) { - let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = original_count + count_increase; + fn update_reset_time(&self, reset_time: SystemTime) { + let original_time = *self.rate_limit_count_tx.lock().borrow(); let updated_time = if let Some(original_time) = original_time { if reset_time < original_time { @@ -194,9 +198,9 @@ impl OpenAIEmbeddings { Some(reset_time) }; - log::trace!("updating rate limit: Count: {:?}", updated_count); + log::trace!("updating rate limit time: {:?}", updated_time); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = updated_time; } async fn send_request( &self, @@ -229,8 +233,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { } fn rate_limit_expiration(&self) -> Option { - let (expiration_time, _) = *self.rate_limit_count_rx.borrow(); - expiration_time + *self.rate_limit_count_rx.borrow() } fn truncate(&self, span: &str) -> (String, usize) { let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); @@ -296,6 +299,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { .collect()); } StatusCode::TOO_MANY_REQUESTS => { + rate_limiting = true; let mut body = String::new(); response.body_mut().read_to_string(&mut body).await?; @@ -316,13 +320,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { // If we've previously rate limited, increment the duration but not the count let reset_time = SystemTime::now().add(delay_duration); - if rate_limiting { - self.update_rate_limit(reset_time, 0); - } else { - self.update_rate_limit(reset_time, 1); - } - - rate_limiting = true; + self.update_reset_time(reset_time); log::trace!( "openai rate limiting: waiting {:?} until lifted", diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index b60d697b43..92b11f00d1 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -232,9 +232,20 @@ impl ProjectState { _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); |this, mut cx| async move { - while let Some(_) = pending_file_count_rx.next().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |_, cx| cx.notify()); + loop { + let mut timer = cx.background().timer(Duration::from_millis(350)).fuse(); + let mut pending_file_count = pending_file_count_rx.next().fuse(); + futures::select_biased! { + _ = pending_file_count => { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |_, cx| cx.notify()); + } + }, + _ = timer => { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |_, cx| cx.notify()); + } + } } } } From 7df21f86ddcbcbdc6eea31f898d14c28ceb5e92c Mon Sep 17 00:00:00 2001 From: KCaverly Date: Mon, 11 Sep 2023 10:11:40 -0400 Subject: [PATCH 4/5] move cx notify observe for rate_limit_expiry into ProjectState in the semantic index Co-authored-by: Antonio --- crates/search/src/project_search.rs | 31 +++++++++++++++++---- crates/semantic_index/src/semantic_index.rs | 21 ++++---------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 5a1d5992a6..b85d0b9b40 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -34,7 +34,7 @@ use std::{ ops::{Not, Range}, path::PathBuf, sync::Arc, - time::SystemTime, + time::{Duration, SystemTime}, }; use util::ResultExt as _; use workspace::{ @@ -131,6 +131,7 @@ pub struct ProjectSearchView { struct SemanticState { index_status: SemanticIndexStatus, + maintain_rate_limit: Option>, _subscription: Subscription, } @@ -322,14 +323,14 @@ impl View for ProjectSearchView { SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()), SemanticIndexStatus::Indexing { remaining_files, - rate_limit_expiration_time, + rate_limit_expiry, } => { if remaining_files == 0 { Some(format!("Indexing...")) } else { - if let Some(rate_limit_expiration_time) = rate_limit_expiration_time { + if let Some(rate_limit_expiry) = rate_limit_expiry { if let Ok(remaining_seconds) = - rate_limit_expiration_time.duration_since(SystemTime::now()) + rate_limit_expiry.duration_since(SystemTime::now()) { Some(format!( "Remaining files to index (rate limit resets in {}s): {}", @@ -669,9 +670,10 @@ impl ProjectSearchView { self.semantic_state = Some(SemanticState { index_status: semantic_index.read(cx).status(&project), + maintain_rate_limit: None, _subscription: cx.observe(&semantic_index, Self::semantic_index_changed), }); - cx.notify(); + self.semantic_index_changed(semantic_index, cx); } } @@ -682,8 +684,25 @@ impl ProjectSearchView { ) { let project = self.model.read(cx).project.clone(); if let Some(semantic_state) = self.semantic_state.as_mut() { - semantic_state.index_status = semantic_index.read(cx).status(&project); cx.notify(); + semantic_state.index_status = semantic_index.read(cx).status(&project); + if let SemanticIndexStatus::Indexing { + rate_limit_expiry: Some(_), + .. + } = &semantic_state.index_status + { + if semantic_state.maintain_rate_limit.is_none() { + semantic_state.maintain_rate_limit = + Some(cx.spawn(|this, mut cx| async move { + loop { + cx.background().timer(Duration::from_secs(1)).await; + this.update(&mut cx, |_, cx| cx.notify()).log_err(); + } + })); + return; + } + } + semantic_state.maintain_rate_limit = None; } } diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 92b11f00d1..efcc1ba242 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -112,7 +112,7 @@ pub enum SemanticIndexStatus { Indexed, Indexing { remaining_files: usize, - rate_limit_expiration_time: Option, + rate_limit_expiry: Option, }, } @@ -232,20 +232,9 @@ impl ProjectState { _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); |this, mut cx| async move { - loop { - let mut timer = cx.background().timer(Duration::from_millis(350)).fuse(); - let mut pending_file_count = pending_file_count_rx.next().fuse(); - futures::select_biased! { - _ = pending_file_count => { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |_, cx| cx.notify()); - } - }, - _ = timer => { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |_, cx| cx.notify()); - } - } + while let Some(_) = pending_file_count_rx.next().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |_, cx| cx.notify()); } } } @@ -304,7 +293,7 @@ impl SemanticIndex { } else { SemanticIndexStatus::Indexing { remaining_files: project_state.pending_file_count_rx.borrow().clone(), - rate_limit_expiration_time: self.embedding_provider.rate_limit_expiration(), + rate_limit_expiry: self.embedding_provider.rate_limit_expiration(), } } } else { From e678c7d9ee29cd7de6ab788c866c8a950b843306 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Mon, 11 Sep 2023 10:26:14 -0400 Subject: [PATCH 5/5] swap SystemTime for Instant throughout rate_limit_expiry tracking --- crates/search/src/project_search.rs | 8 ++++---- crates/semantic_index/src/embedding.rs | 18 +++++++++--------- crates/semantic_index/src/semantic_index.rs | 2 +- .../semantic_index/src/semantic_index_tests.rs | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index b85d0b9b40..6ca4928803 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -34,7 +34,7 @@ use std::{ ops::{Not, Range}, path::PathBuf, sync::Arc, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use util::ResultExt as _; use workspace::{ @@ -329,9 +329,9 @@ impl View for ProjectSearchView { Some(format!("Indexing...")) } else { if let Some(rate_limit_expiry) = rate_limit_expiry { - if let Ok(remaining_seconds) = - rate_limit_expiry.duration_since(SystemTime::now()) - { + let remaining_seconds = + rate_limit_expiry.duration_since(Instant::now()); + if remaining_seconds > Duration::from_secs(0) { Some(format!( "Remaining files to index (rate limit resets in {}s): {}", remaining_seconds.as_secs(), diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 7bac809c97..42d90f0fdb 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use std::env; use std::ops::Add; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant}; use tiktoken_rs::{cl100k_base, CoreBPE}; use util::http::{HttpClient, Request}; @@ -85,8 +85,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, - rate_limit_count_rx: watch::Receiver>, - rate_limit_count_tx: Arc>>>, + rate_limit_count_rx: watch::Receiver>, + rate_limit_count_tx: Arc>>>, } #[derive(Serialize)] @@ -119,14 +119,14 @@ pub trait EmbeddingProvider: Sync + Send { async fn embed_batch(&self, spans: Vec) -> Result>; fn max_tokens_per_batch(&self) -> usize; fn truncate(&self, span: &str) -> (String, usize); - fn rate_limit_expiration(&self) -> Option; + fn rate_limit_expiration(&self) -> Option; } pub struct DummyEmbeddings {} #[async_trait] impl EmbeddingProvider for DummyEmbeddings { - fn rate_limit_expiration(&self) -> Option { + fn rate_limit_expiration(&self) -> Option { None } async fn embed_batch(&self, spans: Vec) -> Result> { @@ -174,7 +174,7 @@ impl OpenAIEmbeddings { let reset_time = *self.rate_limit_count_tx.lock().borrow(); if let Some(reset_time) = reset_time { - if SystemTime::now() >= reset_time { + if Instant::now() >= reset_time { *self.rate_limit_count_tx.lock().borrow_mut() = None } } @@ -185,7 +185,7 @@ impl OpenAIEmbeddings { ); } - fn update_reset_time(&self, reset_time: SystemTime) { + fn update_reset_time(&self, reset_time: Instant) { let original_time = *self.rate_limit_count_tx.lock().borrow(); let updated_time = if let Some(original_time) = original_time { @@ -232,7 +232,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { 50000 } - fn rate_limit_expiration(&self) -> Option { + fn rate_limit_expiration(&self) -> Option { *self.rate_limit_count_rx.borrow() } fn truncate(&self, span: &str) -> (String, usize) { @@ -319,7 +319,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { }; // If we've previously rate limited, increment the duration but not the count - let reset_time = SystemTime::now().add(delay_duration); + let reset_time = Instant::now().add(delay_duration); self.update_reset_time(reset_time); log::trace!( diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index efcc1ba242..115bf5d7a8 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -112,7 +112,7 @@ pub enum SemanticIndexStatus { Indexed, Indexing { remaining_files: usize, - rate_limit_expiry: Option, + rate_limit_expiry: Option, }, } diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 4bc95bec62..9035327b2e 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -21,7 +21,7 @@ use std::{ atomic::{self, AtomicUsize}, Arc, }, - time::SystemTime, + time::{Instant, SystemTime}, }; use unindent::Unindent; use util::RandomCharIter; @@ -1275,7 +1275,7 @@ impl EmbeddingProvider for FakeEmbeddingProvider { 200 } - fn rate_limit_expiration(&self) -> Option { + fn rate_limit_expiration(&self) -> Option { None }