diff --git a/src/builder.rs b/src/builder.rs index 3d12ee103..9c7edee4a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -76,9 +76,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, - PendingPaymentStore, SyncAndAsyncKVStore, + AsyncPersister, BatchingStore, ChainMonitor, ChannelManager, DynStore, DynStoreRef, + DynStoreWrapper, GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, + PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -86,6 +86,7 @@ use crate::{Node, NodeMetrics}; const LSPS_HARDENED_CHILD_INDEX: u32 = 577; const PERSISTER_MAX_PENDING_UPDATES: u64 = 100; +const STORE_READ_BATCH_SIZE: usize = 50; #[derive(Debug, Clone)] enum ChainDataSourceConfig { @@ -1265,14 +1266,18 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let kv_store_ref = Arc::clone(&kv_store); + // Wrap the store with concurrency limiting for parallel initialization reads. + let batch_store: Arc = + Arc::new(DynStoreWrapper(BatchingStore::new(Arc::clone(&kv_store), STORE_READ_BATCH_SIZE))); + + let batch_store_ref = Arc::clone(&batch_store); let logger_ref = Arc::clone(&logger); let (payment_store_res, node_metris_res, pending_payment_store_res) = runtime.block_on(async move { tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + read_payments(&*batch_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*batch_store_ref, Arc::clone(&logger_ref)), + read_pending_payments(&*batch_store_ref, Arc::clone(&logger_ref)) ) }); @@ -1515,12 +1520,12 @@ fn build_with_store_internal( )); // Read ChannelMonitors and the NetworkGraph - let kv_store_ref = Arc::clone(&kv_store); + let batch_store_ref = Arc::clone(&batch_store); let logger_ref = Arc::clone(&logger); let (monitor_read_res, network_graph_res) = runtime.block_on(async { tokio::join!( monitor_reader.read_all_channel_monitors_with_updates_parallel(), - read_network_graph(&*kv_store_ref, logger_ref), + read_network_graph(&*batch_store_ref, logger_ref), ) }); @@ -1566,7 +1571,10 @@ fn build_with_store_internal( }, }; - // Read various smaller LDK and ldk-node objects from the store + // Read various smaller LDK and ldk-node objects from the store. + // Functions that take &DynStore (borrow-only) use batch_store for throttled reads. + // Functions that take Arc (persist into runtime objects) use the original kv_store. + let batch_store_ref = Arc::clone(&batch_store); let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); let network_graph_ref = Arc::clone(&network_graph); @@ -1587,10 +1595,10 @@ fn build_with_store_internal( peer_info_res, ) = runtime.block_on(async move { tokio::join!( - read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)), - read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)), + read_scorer(&*batch_store_ref, network_graph_ref, Arc::clone(&logger_ref)), + read_external_pathfinding_scores_from_cache(&*batch_store_ref, Arc::clone(&logger_ref)), KVStore::read( - &*kv_store_ref, + &*batch_store_ref, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, diff --git a/src/io/utils.rs b/src/io/utils.rs index eef71ec0b..78ab45a43 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -221,81 +221,62 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Read all objects of type `T` from the given namespace, spawning reads in parallel. +/// +/// Concurrency is expected to be limited externally (e.g., via [`BatchingStore`]). +/// +/// [`BatchingStore`]: crate::types::BatchingStore +pub(crate) async fn read_all_objects( + kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L, +) -> Result, std::io::Error> where + T: Readable, + L: Deref, L::Target: LdkLogger, { - let mut res = Vec::new(); - - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; - - const BATCH_SIZE: usize = 50; + let keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; let mut set = tokio::task::JoinSet::new(); - - // Fill JoinSet with tasks if possible - while set.len() < BATCH_SIZE && !stored_keys.is_empty() { - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } + for key in keys { + set.spawn(KVStore::read(kv_store, primary_namespace, secondary_namespace, &key)); } - while let Some(read_res) = set.join_next().await { - // Exit early if we get an IO error. - let reader = read_res + let mut results = Vec::with_capacity(set.len()); + while let Some(res) = set.join_next().await { + let bytes = res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); - set.abort_all(); + log_error!(logger, "Failed to join read task: {}", e); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); - set.abort_all(); + log_error!(logger, "Failed to read object: {}", e); e })?; - - // Refill set for every finished future, if we still have something to do. - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } - - // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + results.push(T::read(&mut &*bytes).map_err(|e| { + log_error!(logger, "Failed to deserialize object: {}", e); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize: {}", e), ) - })?; - res.push(payment); + })?); } + Ok(results) +} - debug_assert!(set.is_empty()); - debug_assert!(stored_keys.is_empty()); - - Ok(res) +/// Read previously persisted payments information from the store. +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_all_objects( + kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + logger, + ) + .await } /// Read `OutputSweeper` state from the store. @@ -632,74 +613,13 @@ pub(crate) async fn read_pending_payments( where L::Target: LdkLogger, { - let mut res = Vec::new(); - - let mut stored_keys = KVStore::list( - &*kv_store, + read_all_objects( + kv_store, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + logger, ) - .await?; - - const BATCH_SIZE: usize = 50; - - let mut set = tokio::task::JoinSet::new(); - - // Fill JoinSet with tasks if possible - while set.len() < BATCH_SIZE && !stored_keys.is_empty() { - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } - } - - while let Some(read_res) = set.join_next().await { - // Exit early if we get an IO error. - let reader = read_res - .map_err(|e| { - log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); - set.abort_all(); - e - })? - .map_err(|e| { - log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); - set.abort_all(); - e - })?; - - // Refill set for every finished future, if we still have something to do. - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } - - // Handle result. - let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to deserialize PendingPaymentDetails", - ) - })?; - res.push(pending_payment); - } - - debug_assert!(set.is_empty()); - debug_assert!(stored_keys.is_empty()); - - Ok(res) + .await } #[cfg(test)] diff --git a/src/types.rs b/src/types.rs index 3424d2779..5ab8f1aee 100644 --- a/src/types.rs +++ b/src/types.rs @@ -218,6 +218,113 @@ impl DynStoreTrait for DynStoreWrapper } } +/// A [`KVStore`] wrapper that limits the number of concurrent async I/O operations using a +/// semaphore. Sync methods pass through without throttling. +/// +/// This is used during node initialization to cap the number of inflight reads across all +/// parallel readers to a single configurable limit. +pub(crate) struct BatchingStore { + inner: Arc, + semaphore: Arc, +} + +impl BatchingStore { + pub(crate) fn new(inner: Arc, max_concurrent: usize) -> Self { + Self { inner, semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent)) } + } +} + +impl KVStore for BatchingStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + let inner = Arc::clone(&self.inner); + let semaphore = Arc::clone(&self.semaphore); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e)) + })?; + inner.read_async(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + Send + 'static { + let inner = Arc::clone(&self.inner); + let semaphore = Arc::clone(&self.semaphore); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e)) + })?; + inner.write_async(&primary_namespace, &secondary_namespace, &key, buf).await + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + Send + 'static { + let inner = Arc::clone(&self.inner); + let semaphore = Arc::clone(&self.semaphore); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e)) + })?; + inner.remove_async(&primary_namespace, &secondary_namespace, &key, lazy).await + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + let inner = Arc::clone(&self.inner); + let semaphore = Arc::clone(&self.semaphore); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e)) + })?; + inner.list_async(&primary_namespace, &secondary_namespace).await + } + } +} + +impl KVStoreSync for BatchingStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error> { + DynStoreTrait::read(&*self.inner, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::write(&*self.inner, primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::remove(&*self.inner, primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error> { + DynStoreTrait::list(&*self.inner, primary_namespace, secondary_namespace) + } +} + pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< DynStoreRef, RuntimeSpawner,