From db44cce9ad360109a03df4ff8cc29290aaf56c49 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Apr 2026 12:13:09 +0200 Subject: [PATCH] Add logging tests and log stream name helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add `progress_log_name`, `summary_log_name`, and `reachability_log_name` helper functions in `logging.rs` that return the log stream name for a given timestamp type - Use the helpers everywhere instead of manual `format!` strings - Add integration tests for progress and summary logging, covering both flat dataflows and nested (iterative/Product) scopes, with content assertions on the logged events - Fix flaky capture doc test (hardcoded port 8000 → port 0) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../dataflow/operators/core/capture/mod.rs | 4 +- timely/src/dataflow/scope.rs | 5 +- timely/src/logging.rs | 21 ++ timely/src/progress/subgraph.rs | 5 +- timely/src/worker.rs | 5 +- timely/tests/logging.rs | 233 ++++++++++++++++++ 6 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 timely/tests/logging.rs diff --git a/timely/src/dataflow/operators/core/capture/mod.rs b/timely/src/dataflow/operators/core/capture/mod.rs index c76033a80..f2d583e64 100644 --- a/timely/src/dataflow/operators/core/capture/mod.rs +++ b/timely/src/dataflow/operators/core/capture/mod.rs @@ -61,8 +61,8 @@ //! # #[cfg(not(miri))] //! # fn main() { //! timely::execute(timely::Config::thread(), |worker| { -//! let list = TcpListener::bind("127.0.0.1:8000").unwrap(); -//! let send = TcpStream::connect("127.0.0.1:8000").unwrap(); +//! let list = TcpListener::bind("127.0.0.1:0").unwrap(); +//! let send = TcpStream::connect(list.local_addr().unwrap()).unwrap(); //! let recv = list.incoming().next().unwrap().unwrap(); //! //! recv.set_nonblocking(true).unwrap(); diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 56ca01e50..6f09ee802 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -97,9 +97,8 @@ impl Scope { let path = slot.addr(); let identifier = slot.identifier(); - let type_name = std::any::type_name::(); - let progress_logging = parent.logger_for(&format!("timely/progress/{type_name}")); - let summary_logging = parent.logger_for(&format!("timely/summary/{type_name}")); + let progress_logging = parent.logger_for(&crate::logging::progress_log_name::()); + let summary_logging = parent.logger_for(&crate::logging::summary_log_name::()); let child = Scope { subgraph: Rc::new(RefCell::new(SubgraphBuilder::new_from( diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 0b2fb499f..b9e99899e 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -15,6 +15,27 @@ pub type TimelySummaryEventBuilder = CapacityContainerBuilder = crate::logging_core::Logger>; +/// Returns the log stream name for progress events of timestamp type `T`. +/// +/// For example, `progress_log_name::()` returns `"timely/progress/usize"`. +pub fn progress_log_name() -> String { + format!("timely/progress/{}", std::any::type_name::()) +} + +/// Returns the log stream name for operator summary events of timestamp type `T`. +/// +/// For example, `summary_log_name::()` returns `"timely/summary/usize"`. +pub fn summary_log_name() -> String { + format!("timely/summary/{}", std::any::type_name::()) +} + +/// Returns the log stream name for reachability events of timestamp type `T`. +/// +/// For example, `reachability_log_name::()` returns `"timely/reachability/usize"`. +pub fn reachability_log_name() -> String { + format!("timely/reachability/{}", std::any::type_name::()) +} + use std::time::Duration; use columnar::Columnar; use serde::{Deserialize, Serialize}; diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 9769ebaf5..bd05af0a4 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -179,11 +179,10 @@ where } // The `None` argument is optional logging infrastructure. - let type_name = std::any::type_name::(); let reachability_logging = - worker.logger_for(&format!("timely/reachability/{type_name}")) + worker.logger_for(&crate::logging::reachability_log_name::()) .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)); - let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}")); + let progress_logging = worker.logger_for(&crate::logging::progress_log_name::()); let (tracker, scope_summary) = builder.build(reachability_logging); let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging); diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 19e00f602..9078779dd 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -652,9 +652,8 @@ impl Worker { let addr = vec![dataflow_index].into(); let identifier = self.new_identifier(); - let type_name = std::any::type_name::(); - let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name)); - let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name)); + let progress_logging = self.logger_for(&crate::logging::progress_log_name::()); + let summary_logging = self.logger_for(&crate::logging::summary_log_name::()); let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); let subscope = Rc::new(RefCell::new(subscope)); diff --git a/timely/tests/logging.rs b/timely/tests/logging.rs new file mode 100644 index 000000000..270b2dcdb --- /dev/null +++ b/timely/tests/logging.rs @@ -0,0 +1,233 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use timely::Config; +use timely::dataflow::InputHandle; +use timely::dataflow::operators::{Input, Exchange, Probe, Enter, Leave}; +use timely::logging::{self, TimelyProgressEventBuilder, TimelyProgressEvent, TimelySummaryEventBuilder, OperatesSummaryEvent}; +use timely::order::Product; + +/// Collects log events into a shared vector during `timely::execute`. +fn collect_events( +) -> (Arc>>, impl Fn(&Duration, &mut Option>) + Clone) { + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let events2 = Arc::clone(&events); + let callback = move |_time: &Duration, data: &mut Option>| { + if let Some(data) = data { + events2.lock().unwrap().extend(data.drain(..)); + } + }; + (events, callback) +} + +// ------------------------------------------------------------------ +// Flat dataflow: Input -> Exchange -> Probe +// ------------------------------------------------------------------ + +#[test] +fn progress_logging() { + let (events, callback) = collect_events::>(); + + timely::execute(Config::thread(), move |worker| { + worker.log_register().unwrap().insert::, _>( + &logging::progress_log_name::(), callback.clone(), + ); + + let mut input = InputHandle::new(); + let probe = timely::dataflow::ProbeHandle::new(); + + worker.dataflow::(|scope| { + scope.input_from(&mut input) + .container::>() + .exchange(|&x: &usize| x as u64) + .probe_with(&probe); + }); + + input.send(0usize); + input.advance_to(1); + while probe.less_than(input.time()) { + worker.step(); + } + }) + .unwrap(); + + let events = events.lock().unwrap(); + + // Both send and receive progress events must be present. + let sends: Vec<_> = events.iter().filter(|(_, e)| e.is_send).collect(); + let recvs: Vec<_> = events.iter().filter(|(_, e)| !e.is_send).collect(); + assert!(!sends.is_empty(), "Expected at least one progress send event"); + assert!(!recvs.is_empty(), "Expected at least one progress recv event"); + + // Every send should be mirrored by a receive with the same seq_no. + for (_, send) in &sends { + let matched = recvs.iter().any(|(_, r)| r.seq_no == send.seq_no && r.channel == send.channel); + assert!(matched, "Send event seq_no={} channel={} has no matching recv", send.seq_no, send.channel); + } + + // The input advances from 0 to 1, so we expect internal capability changes + // that mention timestamp 0 (release) and/or 1 (acquire). + let has_internal = events.iter().any(|(_, e)| !e.internal.is_empty()); + assert!(has_internal, "Expected at least one event with internal capability changes"); +} + +#[test] +fn summary_logging() { + let (events, callback) = collect_events::>(); + + timely::execute(Config::thread(), move |worker| { + worker.log_register().unwrap().insert::, _>( + &logging::summary_log_name::(), callback.clone(), + ); + + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + scope.input_from(&mut input) + .container::>() + .exchange(|&x: &usize| x as u64) + .probe_with(&timely::dataflow::ProbeHandle::new()); + }); + + input.advance_to(1); + worker.step(); + }) + .unwrap(); + + let events = events.lock().unwrap(); + + // The dataflow has Input (0 inputs), Exchange (1 input -> 1 output), and Probe + // (1 input -> 1 output). We should see summary events for each. + // Operators with inputs have non-empty summaries (one PortConnectivity per input). + let nonempty: Vec<_> = events.iter().filter(|(_, e)| !e.summary.is_empty()).collect(); + assert!(nonempty.len() >= 2, "Expected at least 2 operators with non-empty summaries (Exchange, Probe), got {}", nonempty.len()); + + // Each non-empty summary should have exactly 1 entry (one input port). + for (_, e) in &nonempty { + assert_eq!(e.summary.len(), 1, "Operator {} should have 1 input, got {}", e.id, e.summary.len()); + } + + // Every summary entry should map input -> output 0 with the identity summary. + for (_, e) in &nonempty { + for port_conn in &e.summary { + let ports: Vec<_> = port_conn.iter_ports().collect(); + assert!(!ports.is_empty(), "Operator {} has a PortConnectivity with no output mapping", e.id); + for (output, antichain) in &ports { + assert_eq!(*output, 0, "Expected output port 0, got {}", output); + // The default (identity) summary for usize is 0. + assert!(antichain.elements().contains(&0usize), + "Expected identity summary (0) for operator {}, got {:?}", e.id, antichain); + } + } + } + + // All operator ids should be distinct. + let mut ids: Vec<_> = events.iter().map(|(_, e)| e.id).collect(); + ids.sort(); + ids.dedup(); + assert_eq!(ids.len(), events.len(), "Duplicate operator ids in summary events"); +} + +// ------------------------------------------------------------------ +// Nested (iterative) dataflow: Input -> [Enter -> Exchange -> Leave] -> Probe +// ------------------------------------------------------------------ + +#[test] +fn progress_logging_iterative() { + type Inner = Product; + + let (outer_events, outer_cb) = collect_events::>(); + let (inner_events, inner_cb) = collect_events::>(); + + timely::execute(Config::thread(), move |worker| { + worker.log_register().unwrap().insert::, _>( + &logging::progress_log_name::(), outer_cb.clone(), + ); + worker.log_register().unwrap().insert::, _>( + &logging::progress_log_name::(), inner_cb.clone(), + ); + + let mut input = InputHandle::new(); + let probe = timely::dataflow::ProbeHandle::new(); + + worker.dataflow::(|scope| { + let stream = scope.input_from(&mut input).container::>(); + scope.iterative::(|inner| { + stream.enter(inner) + .exchange(|&x: &usize| x as u64) + .leave(scope) + }) + .probe_with(&probe); + }); + + input.send(0usize); + input.advance_to(1); + while probe.less_than(input.time()) { + worker.step(); + } + }) + .unwrap(); + + let outer = outer_events.lock().unwrap(); + let inner = inner_events.lock().unwrap(); + assert!(!outer.is_empty(), "Expected outer scope progress events"); + assert!(!inner.is_empty(), "Expected inner scope progress events"); + + // Inner progress events should carry Product timestamps. + // The internal updates should reference Product<0,0> being released. + let has_product_ts = inner.iter().any(|(_, e)| + e.internal.iter().any(|(_, _, t, _)| *t == Product::new(0, 0)) + ); + assert!(has_product_ts, "Expected inner progress events with Product<0,0> timestamps"); +} + +#[test] +fn summary_logging_iterative() { + type Inner = Product; + type InnerSummary = ::Summary; + + let (outer_events, outer_cb) = collect_events::>(); + let (inner_events, inner_cb) = collect_events::>(); + + timely::execute(Config::thread(), move |worker| { + worker.log_register().unwrap().insert::, _>( + &logging::summary_log_name::(), outer_cb.clone(), + ); + worker.log_register().unwrap().insert::, _>( + &logging::summary_log_name::(), inner_cb.clone(), + ); + + let mut input = InputHandle::new(); + worker.dataflow::(|scope| { + let stream = scope.input_from(&mut input).container::>(); + scope.iterative::(|inner| { + stream.enter(inner) + .exchange(|&x: &usize| x as u64) + .leave(scope) + }); + }); + + input.advance_to(1); + worker.step(); + }) + .unwrap(); + + let outer = outer_events.lock().unwrap(); + let inner = inner_events.lock().unwrap(); + assert!(!outer.is_empty(), "Expected outer scope summary events"); + assert!(!inner.is_empty(), "Expected inner scope summary events"); + + // The outer scope should see the iterative subgraph as a single operator + // with a non-empty summary (it has inputs and outputs). + let outer_nonempty: Vec<_> = outer.iter().filter(|(_, e)| !e.summary.is_empty()).collect(); + assert!(!outer_nonempty.is_empty(), "Expected the subgraph operator to have a non-empty outer summary"); + + // The inner scope should have summary events for the Exchange operator at minimum. + let inner_nonempty: Vec<_> = inner.iter().filter(|(_, e)| !e.summary.is_empty()).collect(); + assert!(!inner_nonempty.is_empty(), "Expected inner operators with non-empty summaries"); + + // All inner operator ids should be distinct. + let mut ids: Vec<_> = inner.iter().map(|(_, e)| e.id).collect(); + ids.sort(); + ids.dedup(); + assert_eq!(ids.len(), inner.len(), "Duplicate operator ids in inner summary events"); +}