Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
read_scorer, write_node_metrics,
read_scorer,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
Expand Down Expand Up @@ -1770,21 +1770,11 @@ fn build_with_store_internal(
Arc::clone(&logger),
));

// Reset the RGS sync timestamp in case we somehow switch gossip sources
{
let mut locked_node_metrics = node_metrics.write().expect("lock");
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger))
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
}
p2p_source
},
GossipSourceConfig::RapidGossipSync(rgs_server) => {
let latest_sync_timestamp =
node_metrics.read().expect("lock").latest_rgs_snapshot_timestamp.unwrap_or(0);
network_graph.get_last_rapid_gossip_sync_timestamp().unwrap_or(0);
Arc::new(GossipSource::new_rgs(
rgs_server.clone(),
latest_sync_timestamp,
Expand Down
40 changes: 20 additions & 20 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::fee_estimator::{
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
ConfirmationTarget, OnchainFeeEstimator,
};
use crate::io::utils::write_node_metrics;
use crate::io::utils::update_and_persist_node_metrics;
use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};
Expand Down Expand Up @@ -203,15 +203,18 @@ impl BitcoindChainSource {
*self.latest_chain_tip.write().expect("lock") = Some(chain_tip);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
unix_time_secs_opt;
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)
.unwrap_or_else(|e| {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
});
update_and_persist_node_metrics(
&self.node_metrics,
&*self.kv_store,
&*self.logger,
|m| {
m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
},
)
.unwrap_or_else(|e| {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
});
}
break;
},
Expand Down Expand Up @@ -454,11 +457,10 @@ impl BitcoindChainSource {

let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;

write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
})?;

Ok(())
}
Expand Down Expand Up @@ -568,11 +570,9 @@ impl BitcoindChainSource {

let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
})?;

Ok(())
}
Expand Down
37 changes: 16 additions & 21 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::fee_estimator::{
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
ConfirmationTarget, OnchainFeeEstimator,
};
use crate::io::utils::write_node_metrics;
use crate::io::utils::update_and_persist_node_metrics;
use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
Expand Down Expand Up @@ -141,16 +141,12 @@ impl ElectrumChainSource {
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
&*self.kv_store,
&*self.logger,
)?;
}
update_and_persist_node_metrics(
&self.node_metrics,
&*self.kv_store,
&*self.logger,
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
)?;
Ok(())
},
Err(e) => Err(e),
Expand Down Expand Up @@ -238,11 +234,12 @@ impl ElectrumChainSource {
if let Ok(_) = res {
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}
update_and_persist_node_metrics(
&self.node_metrics,
&*self.kv_store,
&*self.logger,
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
)?;
}

res
Expand Down Expand Up @@ -271,11 +268,9 @@ impl ElectrumChainSource {

let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
})?;

Ok(())
}
Expand Down
39 changes: 17 additions & 22 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::fee_estimator::{
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
OnchainFeeEstimator,
};
use crate::io::utils::write_node_metrics;
use crate::io::utils::update_and_persist_node_metrics;
use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger};
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};
Expand Down Expand Up @@ -122,16 +122,13 @@ impl EsploraChainSource {
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
&*self.kv_store,
&*self.logger
)?;
}
Ok(())
update_and_persist_node_metrics(
&self.node_metrics,
&*self.kv_store,
&*self.logger,
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
)?;
Ok(())
},
Err(e) => Err(e),
},
Expand Down Expand Up @@ -263,12 +260,12 @@ impl EsploraChainSource {

let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}
update_and_persist_node_metrics(
&self.node_metrics,
&*self.kv_store,
&*self.logger,
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
)?;
Ok(())
},
Err(e) => {
Expand Down Expand Up @@ -348,11 +345,9 @@ impl EsploraChainSource {
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
})?;

Ok(())
}
Expand Down
15 changes: 11 additions & 4 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::ops::Deref;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use bdk_chain::indexer::keychain_txout::ChangeSet as BdkIndexerChangeSet;
use bdk_chain::local_chain::ChangeSet as BdkLocalChainChangeSet;
Expand Down Expand Up @@ -346,13 +346,20 @@ where
})
}

pub(crate) fn write_node_metrics<L: Deref>(
node_metrics: &NodeMetrics, kv_store: &DynStore, logger: L,
/// Take a write lock on `node_metrics`, apply `update`, and persist the result to `kv_store`.
///
/// The write lock is held across the KV-store write, preserving the invariant that readers only
/// observe the mutation once it has been durably persisted (or the persist has failed).
pub(crate) fn update_and_persist_node_metrics<L: Deref>(
node_metrics: &RwLock<NodeMetrics>, kv_store: &DynStore, logger: L,
update: impl FnOnce(&mut NodeMetrics),
) -> Result<(), Error>
where
L::Target: LdkLogger,
{
let data = node_metrics.encode();
let mut locked_node_metrics = node_metrics.write().expect("lock");
update(&mut *locked_node_metrics);
let data = locked_node_metrics.encode();
KVStoreSync::write(
&*kv_store,
NODE_METRICS_PRIMARY_NAMESPACE,
Expand Down
Loading
Loading