diff --git a/Cargo.toml b/Cargo.toml index d626b1aaf67c5..d1b8e0b547d30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -456,7 +456,8 @@ byteorder = "1.5.0" windows-service = "0.8.0" [target.'cfg(unix)'.dependencies] -nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs"] } +libc.workspace = true +nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs", "resource"] } [target.'cfg(target_os = "linux")'.dependencies] netlink-packet-utils = "0.5.2" diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 79f7273e3746c..814cc77c0eb67 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -58,6 +58,7 @@ where pub remove_after: Option, pub emitter: E, pub rotate_wait: Duration, + pub max_open_files: Option, } /// `FileServer` as Source @@ -185,6 +186,21 @@ where for (_file_id, watcher) in &mut fp_map { watcher.set_file_findable(false); // assume not findable until found } + + // Pre-build eviction candidates sorted by last_read_success (oldest first). + // This avoids O(n) scans per eviction when many new files appear at once. + let eviction_candidates: Vec = if self.max_open_files.is_some() { + let mut candidates: Vec<_> = fp_map + .iter() + .map(|(&fid, w)| (w.last_read_success(), fid)) + .collect(); + candidates.sort_unstable(); + candidates.into_iter().map(|(_, fid)| fid).collect() + } else { + Vec::new() + }; + let mut eviction_idx = 0; + for path in self.paths_provider.paths().into_iter() { if let Some(file_id) = self .fingerprinter @@ -230,6 +246,30 @@ where } } else { // untracked file fingerprint + // Check max_open_files limit before adding new file + if let Some(max) = self.max_open_files + && fp_map.len() >= max + { + while eviction_idx < eviction_candidates.len() { + let evict_id = eviction_candidates[eviction_idx]; + eviction_idx += 1; + // Skip candidates already removed from fp_map + if let Some(watcher) = fp_map.swap_remove(&evict_id) { + info!( + message = "Evicting least recently read file due to max_open_files limit.", + evicted_path = ?watcher.path, + new_path = ?path, + max_open_files = max, + ); + self.emitter.emit_file_unwatched( + &watcher.path, + watcher.reached_eof(), + ); + checkpoints.set_dead(evict_id); + break; + } + } + } self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, false) .await; self.emitter.emit_files_open(fp_map.len()); diff --git a/src/cli.rs b/src/cli.rs index 2282d22f3689c..ba07995226442 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -287,10 +287,173 @@ impl RootOpts { } } + #[cfg(unix)] + raise_file_descriptor_limit(); + crate::metrics::init_global().expect("metrics initialization failed"); } } +/// Raise the soft file descriptor limit (RLIMIT_NOFILE) as high as the OS allows. +/// +/// Many systems default the soft limit to 1024 (Linux) or 256 (macOS), which is too low +/// for Vector when it monitors large numbers of log files. Raising it prevents +/// "Too many open files (os error 24)" errors without requiring manual sysadmin intervention. +/// +/// On Linux, the soft limit is raised to the hard limit (typically 65536+). +/// On macOS, the hard limit can be RLIM_INFINITY, so we first try the hard limit, +/// then fall back to the kernel-enforced `kern.maxfilesperproc` (typically 10240). +#[cfg(unix)] +fn raise_file_descriptor_limit() { + use nix::sys::resource::{Resource, getrlimit, setrlimit}; + use tracing::{info, warn}; + + let (soft, hard) = match getrlimit(Resource::RLIMIT_NOFILE) { + Ok(limits) => limits, + Err(err) => { + warn!(message = "Failed to get file descriptor limit.", %err); + return; + } + }; + + if soft >= hard { + return; // Already at maximum + } + + // Try setting soft limit to hard limit (works on Linux, may fail on macOS) + if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_ok() { + info!( + message = "Raised file descriptor limit.", + from = soft, + to = hard, + ); + return; + } + + // On macOS, the hard limit can be RLIM_INFINITY which setrlimit rejects. + // Fall back to the kernel-enforced kern.maxfilesperproc. + #[cfg(target_os = "macos")] + { + if let Some(maxfiles) = macos_maxfilesperproc() + && maxfiles > soft + && setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard).is_ok() + { + info!( + message = "Raised file descriptor limit.", + from = soft, + to = maxfiles, + ); + return; + } + } + + warn!( + message = "Failed to raise file descriptor limit.", + current = soft, + attempted = hard, + ); +} + +/// Query the macOS kernel limit on per-process open files. +#[cfg(target_os = "macos")] +fn macos_maxfilesperproc() -> Option { + let mut maxfiles: libc::c_int = 0; + let mut len = std::mem::size_of::() as libc::size_t; + // Safety: sysctlbyname with a valid null-terminated name and correctly sized output buffer. + // No safe wrapper exists for this macOS-specific call. + let ret = unsafe { + libc::sysctlbyname( + c"kern.maxfilesperproc".as_ptr(), + &mut maxfiles as *mut libc::c_int as *mut libc::c_void, + &mut len, + std::ptr::null_mut(), + 0, + ) + }; + if ret == 0 && maxfiles > 0 { + Some(maxfiles as libc::rlim_t) + } else { + None + } +} + +#[cfg(test)] +mod tests { + #[test] + #[cfg(unix)] + fn test_raise_file_descriptor_limit() { + use nix::sys::resource::{Resource, getrlimit, setrlimit}; + + // Save original limits + let (original_soft, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + + // Lower the soft limit to simulate a constrained environment + let lowered = std::cmp::min(original_soft, 256); + if lowered < hard { + setrlimit(Resource::RLIMIT_NOFILE, lowered, hard).unwrap(); + + // Verify it was lowered + let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + assert_eq!(soft_before, lowered); + + // Call the function under test + super::raise_file_descriptor_limit(); + + // Verify the soft limit was raised above the lowered value + let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + assert!( + soft_after > lowered, + "Expected soft limit to be raised above {lowered}, got {soft_after}" + ); + + // Restore original limits + setrlimit(Resource::RLIMIT_NOFILE, original_soft, hard).unwrap(); + } + } + + #[test] + #[cfg(unix)] + fn test_raise_file_descriptor_limit_already_at_max() { + use nix::sys::resource::{Resource, getrlimit, setrlimit}; + + // Save original limits + let (original_soft, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + + // Set soft = hard so there's nothing to raise + if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_err() { + #[cfg(target_os = "macos")] + if let Some(maxfiles) = super::macos_maxfilesperproc() { + let _ = setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard); + } + } + + let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + + // Call the function — should be a no-op + super::raise_file_descriptor_limit(); + + let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + assert_eq!(soft_before, soft_after); + + // Restore original limits + setrlimit(Resource::RLIMIT_NOFILE, original_soft, hard).unwrap(); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_macos_maxfilesperproc_returns_positive() { + let result = super::macos_maxfilesperproc(); + assert!( + result.is_some(), + "macos_maxfilesperproc() should return Some on macOS" + ); + assert!( + result.unwrap() > 0, + "kern.maxfilesperproc should be positive" + ); + } +} + #[derive(Parser, Debug)] #[command(rename_all = "kebab-case")] pub enum SubCommand { diff --git a/src/sources/file.rs b/src/sources/file.rs index 62a95db362f10..98426b9eb8b13 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -25,7 +25,7 @@ use vector_lib::{ }; use vrl::value::Kind; -use super::util::{EncodingConfig, MultilineConfig}; +use super::util::{EncodingConfig, MultilineConfig, default_max_open_files}; use crate::{ SourceSender, config::{ @@ -243,6 +243,22 @@ pub struct FileConfig { #[configurable(metadata(docs::type_unit = "seconds"))] #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")] pub rotate_wait: Duration, + + /// The maximum number of files that can be open simultaneously. + /// + /// When this limit is reached and a new file is discovered, the least recently + /// read file is closed to make room. The closed file's checkpoint is preserved, + /// so reading resumes from where it left off when the file is re-discovered. + /// + /// This helps prevent "Too many open files" (os error 24) errors when monitoring + /// directories with a large number of log files. + /// + /// If not set, there is no limit on the number of open files. + #[serde(default)] + #[configurable(metadata(docs::examples = 512))] + #[configurable(metadata(docs::examples = 1024))] + #[configurable(metadata(docs::human_name = "Maximum Open Files"))] + pub max_open_files: Option, } fn default_max_line_bytes() -> usize { @@ -383,6 +399,7 @@ impl Default for FileConfig { log_namespace: None, internal_metrics: Default::default(), rotate_wait: default_rotate_wait(), + max_open_files: None, } } } @@ -549,6 +566,7 @@ pub fn file_source( remove_after: config.remove_after_secs.map(Duration::from_secs), emitter, rotate_wait: config.rotate_wait, + max_open_files: config.max_open_files.or_else(default_max_open_files), }; let event_metadata = EventMetadata { @@ -2522,4 +2540,130 @@ mod tests { .map(|log| log.get_message().unwrap().clone()) .collect() } + + #[tokio::test] + async fn test_max_open_files_eviction() { + let dir = tempdir().unwrap(); + let config = file::FileConfig { + include: vec![dir.path().join("*")], + max_open_files: Some(2), + ..test_default_file_config(&dir) + }; + + let path1 = dir.path().join("file1"); + let path2 = dir.path().join("file2"); + let path3 = dir.path().join("file3"); + + let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async { + // Create and write to 2 files (at the limit) + let mut file1 = File::create(&path1).unwrap(); + writeln!(&mut file1, "file1 first").unwrap(); + file1.flush().unwrap(); + sleep_500_millis().await; + + let mut file2 = File::create(&path2).unwrap(); + writeln!(&mut file2, "file2 first").unwrap(); + file2.flush().unwrap(); + sleep_500_millis().await; + + // Write more to file2 so file1 becomes the least recently read + writeln!(&mut file2, "file2 second").unwrap(); + file2.flush().unwrap(); + sleep_500_millis().await; + + // Create a 3rd file — should evict file1 (least recently read) + let mut file3 = File::create(&path3).unwrap(); + writeln!(&mut file3, "file3 first").unwrap(); + file3.flush().unwrap(); + sleep_500_millis().await; + + // Write more to file1 — it should be re-opened from checkpoint + writeln!(&mut file1, "file1 second").unwrap(); + file1.flush().unwrap(); + sleep_500_millis().await; + }) + .await; + + let lines = extract_messages_string(received); + + // All lines from all files should be received — no data loss + assert!( + lines.contains(&"file1 first".to_string()), + "missing 'file1 first' in {lines:?}" + ); + assert!( + lines.contains(&"file2 first".to_string()), + "missing 'file2 first' in {lines:?}" + ); + assert!( + lines.contains(&"file2 second".to_string()), + "missing 'file2 second' in {lines:?}" + ); + assert!( + lines.contains(&"file3 first".to_string()), + "missing 'file3 first' in {lines:?}" + ); + assert!( + lines.contains(&"file1 second".to_string()), + "missing 'file1 second' in {lines:?}" + ); + } + + #[tokio::test] + async fn test_max_open_files_no_eviction_under_limit() { + let dir = tempdir().unwrap(); + let config = file::FileConfig { + include: vec![dir.path().join("*")], + max_open_files: Some(5), + ..test_default_file_config(&dir) + }; + + let path1 = dir.path().join("file1"); + let path2 = dir.path().join("file2"); + let path3 = dir.path().join("file3"); + + let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async { + let mut file1 = File::create(&path1).unwrap(); + let mut file2 = File::create(&path2).unwrap(); + let mut file3 = File::create(&path3).unwrap(); + + writeln!(&mut file1, "hello from 1").unwrap(); + writeln!(&mut file2, "hello from 2").unwrap(); + writeln!(&mut file3, "hello from 3").unwrap(); + + file1.flush().unwrap(); + file2.flush().unwrap(); + file3.flush().unwrap(); + + sleep_500_millis().await; + }) + .await; + + // Under the limit — all files read normally, same as without max_open_files + let lines = extract_messages_string(received); + assert_eq!(lines.len(), 3); + assert!(lines.contains(&"hello from 1".to_string())); + assert!(lines.contains(&"hello from 2".to_string())); + assert!(lines.contains(&"hello from 3".to_string())); + } + + #[test] + fn test_max_open_files_config_parsing() { + let config: FileConfig = toml::from_str( + r#" + include = [ "/var/log/**/*.log" ] + max_open_files = 512 + "#, + ) + .unwrap(); + assert_eq!(config.max_open_files, Some(512)); + + let config: FileConfig = toml::from_str( + r#" + include = [ "/var/log/**/*.log" ] + "#, + ) + .unwrap(); + assert_eq!(config.max_open_files, None); + } } diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 905eed8288b23..8101bb551599b 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -870,6 +870,8 @@ impl Source { }, // A handle to the current tokio runtime rotate_wait, + // Auto-derive max open files from OS file descriptor limit. + max_open_files: crate::sources::util::default_max_open_files(), }; let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::>(2); diff --git a/src/sources/util/mod.rs b/src/sources/util/mod.rs index 2ba61bdaf911d..bb80e801d61e7 100644 --- a/src/sources/util/mod.rs +++ b/src/sources/util/mod.rs @@ -76,6 +76,34 @@ pub use self::http::decompress_body; ))] pub use self::message_decoding::decode_message; +/// Derive the default `max_open_files` limit from the current OS file descriptor limit. +/// +/// Returns 80% of the current soft RLIMIT_NOFILE, leaving headroom for sinks, +/// network connections, fingerprinting, and other non-file-source FD usage. +/// Returns `None` on non-Unix platforms or if querying the limit fails. +#[cfg(unix)] +pub fn default_max_open_files() -> Option { + use nix::sys::resource::{Resource, getrlimit}; + + if let Ok((soft, _hard)) = getrlimit(Resource::RLIMIT_NOFILE) { + let limit = (soft as usize).saturating_mul(80) / 100; + if limit > 0 { + tracing::info!( + message = "Auto-configured max_open_files from OS file descriptor limit.", + rlimit_nofile = soft, + max_open_files = limit, + ); + return Some(limit); + } + } + None +} + +#[cfg(not(unix))] +pub fn default_max_open_files() -> Option { + None +} + /// Extract a tag and it's value from input string delimited by a colon character. /// /// Note: the behavior of StatsD if more than one colon is found (which would presumably @@ -98,3 +126,27 @@ pub fn extract_tag_key_and_value>( None => (tag_chunk.to_string(), TagValue::Bare), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[cfg(unix)] + fn test_default_max_open_files_returns_eighty_percent() { + use nix::sys::resource::{Resource, getrlimit}; + + let result = default_max_open_files(); + let (soft, _hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap(); + let expected = (soft as usize).saturating_mul(80) / 100; + + assert_eq!(result, Some(expected)); + } + + #[test] + #[cfg(unix)] + fn test_default_max_open_files_is_some() { + // On any Unix system with a positive FD limit, this should return Some + assert!(default_max_open_files().is_some()); + } +}