use std::{ sync::{Mutex, OnceLock}, time::Instant, }; use opentelemetry::global; use tracing::warn; // 进程指标只描述 api-server 自身,不携带请求、用户或作品维度,避免 OTLP 指标高基数膨胀。 pub(crate) fn register_process_metrics() { static REGISTERED: OnceLock<()> = OnceLock::new(); REGISTERED.get_or_init(register_process_metrics_once); } fn register_process_metrics_once() { let meter = global::meter("genarrative-api"); meter .i64_observable_up_down_counter("process.memory.usage") .with_unit("By") .with_description("api-server process physical memory usage") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; observer.observe(to_i64(snapshot.rss_bytes), &[]); }) .build(); meter .i64_observable_up_down_counter("process.memory.virtual") .with_unit("By") .with_description("api-server committed virtual memory") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(virtual_bytes) = snapshot.virtual_bytes { observer.observe(to_i64(virtual_bytes), &[]); } }) .build(); meter .i64_observable_up_down_counter("genarrative.process.memory.private") .with_unit("By") .with_description("api-server private memory for local diagnostics") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(private_bytes) = snapshot.private_bytes { observer.observe(to_i64(private_bytes), &[]); } }) .build(); meter .f64_observable_counter("process.cpu.time") .with_unit("s") .with_description("api-server total user plus system CPU time") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(cpu_time_seconds) = snapshot.cpu_time_seconds { observer.observe(cpu_time_seconds, &[]); } }) .build(); meter .f64_observable_gauge("genarrative.process.cpu.usage_percent") .with_unit("%") .with_description("api-server process CPU usage between metric collections") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(cpu_time_seconds) = snapshot.cpu_time_seconds { if let Some(usage_percent) = process_cpu_usage_percent(cpu_time_seconds, Instant::now()) { observer.observe(usage_percent, &[]); } } }) .build(); meter .i64_observable_up_down_counter("process.thread.count") .with_unit("{thread}") .with_description("api-server process thread count") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; observer.observe(to_i64(snapshot.thread_count), &[]); }) .build(); meter .i64_observable_up_down_counter("process.windows.handle.count") .with_unit("{handle}") .with_description("api-server process handle count on Windows") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(handle_count) = snapshot.windows_handle_count { observer.observe(to_i64(handle_count), &[]); } }) .build(); meter .i64_observable_up_down_counter("process.unix.file_descriptor.count") .with_unit("{file_descriptor}") .with_description("api-server process file descriptor count on Unix") .with_callback(|observer| { let Some(snapshot) = ProcessMetricsSnapshot::collect() else { return; }; if let Some(fd_count) = snapshot.unix_fd_count { observer.observe(to_i64(fd_count), &[]); } }) .build(); } fn to_i64(value: u64) -> i64 { value.min(i64::MAX as u64) as i64 } #[derive(Debug, Clone, Copy, PartialEq)] struct ProcessMetricsSnapshot { rss_bytes: u64, private_bytes: Option, virtual_bytes: Option, cpu_time_seconds: Option, thread_count: u64, windows_handle_count: Option, unix_fd_count: Option, } impl ProcessMetricsSnapshot { fn collect() -> Option { collect_process_metrics() .inspect_err(|error| { warn!(%error, "采集 api-server 进程指标失败"); }) .ok() } } #[derive(Debug, Clone, Copy)] struct CpuUsageSample { cpu_time_seconds: f64, observed_at: Instant, } fn process_cpu_usage_percent(cpu_time_seconds: f64, observed_at: Instant) -> Option { static LAST_SAMPLE: OnceLock>> = OnceLock::new(); let mut last_sample = LAST_SAMPLE.get_or_init(|| Mutex::new(None)).lock().ok()?; let previous = *last_sample; *last_sample = Some(CpuUsageSample { cpu_time_seconds, observed_at, }); let previous = previous?; let wall_delta_seconds = observed_at .checked_duration_since(previous.observed_at)? .as_secs_f64(); cpu_usage_ratio_between_samples( previous.cpu_time_seconds, cpu_time_seconds, 0.0, wall_delta_seconds, ) .map(|ratio| ratio * 100.0) } fn cpu_usage_ratio_between_samples( previous_cpu_seconds: f64, current_cpu_seconds: f64, previous_wall_seconds: f64, current_wall_seconds: f64, ) -> Option { let cpu_delta_seconds = current_cpu_seconds - previous_cpu_seconds; let wall_delta_seconds = current_wall_seconds - previous_wall_seconds; if cpu_delta_seconds < 0.0 || wall_delta_seconds <= 0.0 { return None; } Some(cpu_delta_seconds / wall_delta_seconds) } #[cfg(windows)] fn collect_process_metrics() -> Result { use windows_sys::Win32::System::{ ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS_EX}, Threading::{GetCurrentProcess, GetCurrentProcessId, GetProcessHandleCount}, }; let handle = unsafe { GetCurrentProcess() }; let mut counters = PROCESS_MEMORY_COUNTERS_EX { cb: std::mem::size_of::() as u32, ..Default::default() }; let ok = unsafe { GetProcessMemoryInfo(handle, std::ptr::addr_of_mut!(counters).cast(), counters.cb) }; if ok == 0 { return Err("GetProcessMemoryInfo returned false".to_string()); } let mut handle_count = 0_u32; let handle_count = if unsafe { GetProcessHandleCount(handle, &mut handle_count) } == 0 { None } else { Some(u64::from(handle_count)) }; let cpu_time_seconds = windows_process_cpu_time_seconds(handle); Ok(ProcessMetricsSnapshot { rss_bytes: counters.WorkingSetSize as u64, private_bytes: Some(counters.PrivateUsage as u64), virtual_bytes: Some(counters.PrivateUsage as u64), cpu_time_seconds, thread_count: u64::from(unsafe { GetCurrentProcessId() }.thread_count()?), windows_handle_count: handle_count, unix_fd_count: None, }) } #[cfg(windows)] fn windows_process_cpu_time_seconds(handle: windows_sys::Win32::Foundation::HANDLE) -> Option { use windows_sys::Win32::{Foundation::FILETIME, System::Threading::GetProcessTimes}; let mut creation_time = FILETIME::default(); let mut exit_time = FILETIME::default(); let mut kernel_time = FILETIME::default(); let mut user_time = FILETIME::default(); let ok = unsafe { GetProcessTimes( handle, &mut creation_time, &mut exit_time, &mut kernel_time, &mut user_time, ) }; if ok == 0 { return None; } let total_100ns = filetime_100ns(kernel_time) + filetime_100ns(user_time); Some(total_100ns as f64 / 10_000_000.0) } #[cfg(windows)] fn filetime_100ns(filetime: windows_sys::Win32::Foundation::FILETIME) -> u64 { ((filetime.dwHighDateTime as u64) << 32) | u64::from(filetime.dwLowDateTime) } #[cfg(windows)] trait WindowsProcessThreadCount { fn thread_count(self) -> Result; } #[cfg(windows)] impl WindowsProcessThreadCount for u32 { fn thread_count(self) -> Result { use windows_sys::Win32::{ Foundation::{CloseHandle, INVALID_HANDLE_VALUE}, System::Diagnostics::ToolHelp::{ CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, TH32CS_SNAPPROCESS, }, }; let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) }; if snapshot == INVALID_HANDLE_VALUE { return Err("CreateToolhelp32Snapshot returned INVALID_HANDLE_VALUE".to_string()); } let mut entry = PROCESSENTRY32 { dwSize: std::mem::size_of::() as u32, ..Default::default() }; let mut found = None; let mut ok = unsafe { Process32First(snapshot, &mut entry) }; while ok != 0 { if entry.th32ProcessID == self { found = Some(entry.cntThreads); break; } ok = unsafe { Process32Next(snapshot, &mut entry) }; } unsafe { CloseHandle(snapshot); } found.ok_or_else(|| format!("process {self} not found in ToolHelp snapshot")) } } #[cfg(target_os = "linux")] fn collect_process_metrics() -> Result { let status = std::fs::read_to_string("/proc/self/status") .map_err(|error| format!("read /proc/self/status failed: {error}"))?; let statm = std::fs::read_to_string("/proc/self/statm") .map_err(|error| format!("read /proc/self/statm failed: {error}"))?; let stat = std::fs::read_to_string("/proc/self/stat") .map_err(|error| format!("read /proc/self/stat failed: {error}"))?; let page_size = linux_page_size_bytes()?; let rss_bytes = parse_status_kb(&status, "VmRSS:") .map(|value| value * 1024) .or_else(|| parse_statm_pages(&statm, 1).map(|value| value * page_size)) .ok_or_else(|| "missing VmRSS/statm resident field".to_string())?; let virtual_bytes = parse_status_kb(&status, "VmSize:") .map(|value| value * 1024) .or_else(|| parse_statm_pages(&statm, 0).map(|value| value * page_size)) .ok_or_else(|| "missing VmSize/statm size field".to_string())?; let private_bytes = parse_status_kb(&status, "VmData:").map(|value| value * 1024); let cpu_time_seconds = linux_cpu_time_seconds(&stat)?; let thread_count = parse_status_u64(&status, "Threads:").ok_or_else(|| "missing Threads field".to_string())?; Ok(ProcessMetricsSnapshot { rss_bytes, private_bytes, virtual_bytes: Some(virtual_bytes), cpu_time_seconds: Some(cpu_time_seconds), thread_count, windows_handle_count: None, unix_fd_count: linux_fd_count(), }) } #[cfg(target_os = "linux")] fn linux_cpu_time_seconds(stat: &str) -> Result { let cpu_ticks = parse_linux_proc_stat_cpu_ticks(stat) .ok_or_else(|| "missing /proc/self/stat utime/stime fields".to_string())?; let ticks_per_second = linux_clock_ticks_per_second()?; Ok(cpu_ticks as f64 / ticks_per_second as f64) } #[cfg(target_os = "linux")] fn linux_clock_ticks_per_second() -> Result { static CLOCK_TICKS_PER_SECOND: OnceLock> = OnceLock::new(); CLOCK_TICKS_PER_SECOND .get_or_init(|| { let output = std::process::Command::new("getconf") .arg("CLK_TCK") .output() .map_err(|error| format!("getconf CLK_TCK failed: {error}"))?; if !output.status.success() { return Err(format!("getconf CLK_TCK exited with {}", output.status)); } let text = String::from_utf8(output.stdout) .map_err(|error| format!("getconf CLK_TCK output is not utf8: {error}"))?; text.trim() .parse::() .map_err(|error| format!("parse CLK_TCK failed: {error}")) }) .clone() } #[cfg(target_os = "linux")] fn parse_linux_proc_stat_cpu_ticks(stat: &str) -> Option { let fields_after_comm = stat.rsplit_once(") ")?.1; let mut fields = fields_after_comm.split_whitespace(); let utime = fields.nth(11)?.parse::().ok()?; let stime = fields.next()?.parse::().ok()?; Some(utime + stime) } #[cfg(target_os = "linux")] fn linux_page_size_bytes() -> Result { let output = std::process::Command::new("getconf") .arg("PAGESIZE") .output() .map_err(|error| format!("getconf PAGESIZE failed: {error}"))?; if !output.status.success() { return Err(format!("getconf PAGESIZE exited with {}", output.status)); } let text = String::from_utf8(output.stdout) .map_err(|error| format!("getconf PAGESIZE output is not utf8: {error}"))?; text.trim() .parse::() .map_err(|error| format!("parse PAGESIZE failed: {error}")) } #[cfg(target_os = "linux")] fn linux_fd_count() -> Option { let entries = std::fs::read_dir("/proc/self/fd").ok()?; Some(entries.filter_map(Result::ok).count() as u64) } #[cfg(target_os = "linux")] fn parse_status_kb(status: &str, key: &str) -> Option { parse_status_u64(status, key) } #[cfg(target_os = "linux")] fn parse_status_u64(status: &str, key: &str) -> Option { status.lines().find_map(|line| { let rest = line.strip_prefix(key)?.trim(); rest.split_whitespace().next()?.parse::().ok() }) } #[cfg(target_os = "linux")] fn parse_statm_pages(statm: &str, index: usize) -> Option { statm.split_whitespace().nth(index)?.parse::().ok() } #[cfg(not(any(windows, target_os = "linux")))] fn collect_process_metrics() -> Result { Err("process metrics are only implemented for Windows and Linux".to_string()) } #[cfg(test)] mod tests { use super::cpu_usage_ratio_between_samples; #[cfg(target_os = "linux")] use super::{ parse_linux_proc_stat_cpu_ticks, parse_statm_pages, parse_status_kb, parse_status_u64, }; #[cfg(target_os = "linux")] #[test] fn parses_linux_proc_status_memory_fields() { let status = "Name:\tapi-server\nVmSize:\t 123456 kB\nVmRSS:\t 7890 kB\nVmData:\t 3456 kB\nThreads:\t37\n"; assert_eq!(parse_status_kb(status, "VmRSS:"), Some(7890)); assert_eq!(parse_status_kb(status, "VmSize:"), Some(123456)); assert_eq!(parse_status_kb(status, "VmData:"), Some(3456)); assert_eq!(parse_status_u64(status, "Threads:"), Some(37)); } #[cfg(target_os = "linux")] #[test] fn parses_linux_statm_pages() { assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 0), Some(100)); assert_eq!(parse_statm_pages("100 20 0 0 0 0 0", 1), Some(20)); assert_eq!(parse_statm_pages("100 20", 7), None); } #[cfg(target_os = "linux")] #[test] fn parses_linux_proc_stat_cpu_ticks_with_space_in_process_name() { let stat = "123 (api server) S 1 2 3 4 5 6 7 8 9 10 120 30 0 0 20 0 18 0 12345"; assert_eq!(parse_linux_proc_stat_cpu_ticks(stat), Some(150)); } #[test] fn cpu_usage_ratio_uses_cpu_time_delta_over_wall_time() { assert_eq!( cpu_usage_ratio_between_samples(10.0, 12.5, 100.0, 101.0), Some(2.5) ); assert_eq!( cpu_usage_ratio_between_samples(10.0, 9.0, 100.0, 101.0), None ); assert_eq!( cpu_usage_ratio_between_samples(10.0, 11.0, 100.0, 100.0), None ); } }