ext/pf2/src/signal_scheduler.rs in pf2-0.4.0 vs ext/pf2/src/signal_scheduler.rs in pf2-0.5.0

- old
+ new

@@ -1,166 +1,98 @@ #![deny(unsafe_op_in_unsafe_fn)] -mod configuration; -mod timer_installer; - -use self::configuration::{Configuration, TimeMode}; -use self::timer_installer::TimerInstaller; use crate::profile::Profile; use crate::profile_serializer::ProfileSerializer; +use crate::ruby_internal_apis::rb_thread_getcpuclockid; use crate::sample::Sample; +use crate::scheduler::Scheduler; +use crate::session::configuration::{self, Configuration}; use core::panic; -use std::collections::HashSet; -use std::ffi::{c_int, c_void, CStr, CString}; +use std::ffi::{c_int, c_void, CString}; use std::mem::ManuallyDrop; -use std::str::FromStr; use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; use std::{mem, ptr::null_mut}; use rb_sys::*; use crate::util::*; #[derive(Debug)] pub struct SignalScheduler { - configuration: Option<configuration::Configuration>, - profile: Option<Arc<RwLock<Profile>>>, + configuration: Configuration, + profile: Arc<RwLock<Profile>>, } pub struct SignalHandlerArgs { profile: Arc<RwLock<Profile>>, context_ruby_thread: VALUE, } -impl SignalScheduler { - fn new() -> Self { - Self { - configuration: None, - profile: None, +impl Scheduler for SignalScheduler { + fn start(&self) -> VALUE { + self.install_signal_handler(); + + if let configuration::Threads::Targeted(threads) = &self.configuration.target_ruby_threads { + for ruby_thread in threads.iter() { + self.install_timer_to_ruby_thread(*ruby_thread); + } } + + Qtrue.into() } - fn initialize(&mut self, argc: c_int, argv: *const VALUE, _rbself: VALUE) -> VALUE { - // Parse arguments - let kwargs: VALUE = Qnil.into(); - unsafe { - rb_scan_args(argc, argv, cstr!(":"), &kwargs); - }; - let mut kwargs_values: [VALUE; 4] = [Qnil.into(); 4]; - unsafe { - rb_get_kwargs( - kwargs, - [ - rb_intern(cstr!("interval_ms")), - rb_intern(cstr!("threads")), - rb_intern(cstr!("time_mode")), - rb_intern(cstr!("track_all_threads")), - ] - .as_mut_ptr(), - 0, - 4, - kwargs_values.as_mut_ptr(), - ); - }; - let interval: Duration = if kwargs_values[0] != Qundef as VALUE { - let interval_ms = unsafe { rb_num2long(kwargs_values[0]) }; - Duration::from_millis(interval_ms.try_into().unwrap_or_else(|_| { - eprintln!( - "[Pf2] Warning: Specified interval ({}) is not valid. Using default value (49ms).", - interval_ms - ); - 49 - })) - } else { - Duration::from_millis(49) - }; - let threads: VALUE = if kwargs_values[1] != Qundef as VALUE { - kwargs_values[1] - } else { - unsafe { rb_funcall(rb_cThread, rb_intern(cstr!("list")), 0) } - }; - let time_mode: configuration::TimeMode = if kwargs_values[2] != Qundef as VALUE { - let specified_mode = unsafe { - let mut str = rb_funcall(kwargs_values[2], rb_intern(cstr!("to_s")), 0); - let ptr = rb_string_value_ptr(&mut str); - CStr::from_ptr(ptr).to_str().unwrap() - }; - configuration::TimeMode::from_str(specified_mode).unwrap_or_else(|_| { - // Raise an ArgumentError - unsafe { - rb_raise( - rb_eArgError, - cstr!("Invalid time mode. Valid values are 'cpu' and 'wall'."), - ) - } - }) - } else { - configuration::TimeMode::CpuTime - }; - let track_all_threads: bool = if kwargs_values[3] != Qundef as VALUE { - RTEST(kwargs_values[3]) - } else { - false - }; - - let mut target_ruby_threads = HashSet::new(); - unsafe { - for i in 0..RARRAY_LEN(threads) { - let ruby_thread: VALUE = rb_ary_entry(threads, i); - target_ruby_threads.insert(ruby_thread); + fn stop(&self) -> VALUE { + // Finalize + match self.profile.try_write() { + Ok(mut profile) => { + profile.flush_temporary_sample_buffer(); } + Err(_) => { + println!("[pf2 ERROR] stop: Failed to acquire profile lock."); + return Qfalse.into(); + } } - self.configuration = Some(Configuration { - interval, - target_ruby_threads, - time_mode, - track_all_threads, - }); + let profile = self.profile.try_read().unwrap(); + log::debug!("Number of samples: {}", profile.samples.len()); - Qnil.into() + let serialized = ProfileSerializer::serialize(&profile); + let serialized = CString::new(serialized).unwrap(); + unsafe { rb_str_new_cstr(serialized.as_ptr()) } } - fn start(&mut self, _rbself: VALUE) -> VALUE { - let profile = Arc::new(RwLock::new(Profile::new())); - self.start_profile_buffer_flusher_thread(&profile); - self.install_signal_handler(); - - TimerInstaller::install_timer_to_ruby_threads( - self.configuration.as_ref().unwrap().clone(), // FIXME: don't clone - Arc::clone(&profile), - ); - - self.profile = Some(profile); - - Qtrue.into() + fn on_new_thread(&self, thread: VALUE) { + self.install_timer_to_ruby_thread(thread); } - fn stop(&mut self, _rbself: VALUE) -> VALUE { - if let Some(profile) = &self.profile { - // Finalize - match profile.try_write() { - Ok(mut profile) => { - profile.flush_temporary_sample_buffer(); - } - Err(_) => { - println!("[pf2 ERROR] stop: Failed to acquire profile lock."); - return Qfalse.into(); - } + fn dmark(&self) { + match self.profile.read() { + Ok(profile) => unsafe { + profile.dmark(); + }, + Err(_) => { + panic!("[pf2 FATAL] dmark: Failed to acquire profile lock."); } + } + } - let profile = profile.try_read().unwrap(); - log::debug!("Number of samples: {}", profile.samples.len()); + fn dfree(&self) { + // No-op + } - let serialized = ProfileSerializer::serialize(&profile); - let serialized = CString::new(serialized).unwrap(); - unsafe { rb_str_new_cstr(serialized.as_ptr()) } - } else { - panic!("stop() called before start()"); + fn dsize(&self) -> size_t { + // FIXME: Report something better + mem::size_of::<Self>() as size_t + } +} + +impl SignalScheduler { + pub fn new(configuration: &Configuration, profile: Arc<RwLock<Profile>>) -> Self { + Self { + configuration: configuration.clone(), + profile, } } // Install signal handler for profiling events to the current process. fn install_signal_handler(&self) { @@ -200,112 +132,64 @@ if profile.temporary_sample_buffer.push(sample).is_err() { log::debug!("Temporary sample buffer full. Dropping sample."); } } - fn start_profile_buffer_flusher_thread(&self, profile: &Arc<RwLock<Profile>>) { - let profile = Arc::clone(profile); - thread::spawn(move || loop { - log::trace!("Flushing temporary sample buffer"); - match profile.try_write() { - Ok(mut profile) => { - profile.flush_temporary_sample_buffer(); - } - Err(_) => { - log::debug!("flusher: Failed to acquire profile lock"); - } - } - thread::sleep(Duration::from_millis(500)); + fn install_timer_to_ruby_thread(&self, ruby_thread: VALUE) { + // NOTE: This Box never gets dropped + let signal_handler_args = Box::new(SignalHandlerArgs { + profile: Arc::clone(&self.profile), + context_ruby_thread: ruby_thread, }); - } - // Ruby Methods + // rb_funcall deadlocks when called within a THREAD_EVENT_STARTED hook + let kernel_thread_id: i32 = i32::try_from(unsafe { + rb_num2int(rb_funcall( + ruby_thread, + rb_intern(cstr!("native_thread_id")), // kernel thread ID + 0, + )) + }) + .unwrap(); - pub unsafe extern "C" fn rb_initialize( - argc: c_int, - argv: *const VALUE, - rbself: VALUE, - ) -> VALUE { - let mut collector = unsafe { Self::get_struct_from(rbself) }; - collector.initialize(argc, argv, rbself) - } + // Create a signal event + let mut sigevent: libc::sigevent = unsafe { mem::zeroed() }; + // Note: SIGEV_THREAD_ID is Linux-specific. In other platforms, we would need to + // "trampoline" the signal as any pthread can receive the signal. + sigevent.sigev_notify = libc::SIGEV_THREAD_ID; + sigevent.sigev_notify_thread_id = kernel_thread_id; + sigevent.sigev_signo = libc::SIGALRM; + // Pass required args to the signal handler + sigevent.sigev_value.sival_ptr = Box::into_raw(signal_handler_args) as *mut c_void; - pub unsafe extern "C" fn rb_start(rbself: VALUE) -> VALUE { - let mut collector = unsafe { Self::get_struct_from(rbself) }; - collector.start(rbself) - } - - pub unsafe extern "C" fn rb_stop(rbself: VALUE) -> VALUE { - let mut collector = unsafe { Self::get_struct_from(rbself) }; - collector.stop(rbself) - } - - // Functions for TypedData - - // Extract the SignalScheduler struct from a Ruby object - unsafe fn get_struct_from(obj: VALUE) -> ManuallyDrop<Box<Self>> { - unsafe { - let ptr = rb_check_typeddata(obj, &RBDATA); - ManuallyDrop::new(Box::from_raw(ptr as *mut SignalScheduler)) + // Create and configure timer to fire every _interval_ ms of CPU time + let mut timer: libc::timer_t = unsafe { mem::zeroed() }; + let clockid = match self.configuration.time_mode { + configuration::TimeMode::CpuTime => unsafe { rb_thread_getcpuclockid(ruby_thread) }, + configuration::TimeMode::WallTime => libc::CLOCK_MONOTONIC, + }; + let err = unsafe { libc::timer_create(clockid, &mut sigevent, &mut timer) }; + if err != 0 { + panic!("timer_create failed: {}", err); } - } - - #[allow(non_snake_case)] - pub unsafe extern "C" fn rb_alloc(_rbself: VALUE) -> VALUE { - let collector = Box::new(SignalScheduler::new()); - unsafe { Arc::increment_strong_count(&collector) }; - - unsafe { - let rb_mPf2: VALUE = rb_define_module(cstr!("Pf2")); - let rb_cSignalScheduler = - rb_define_class_under(rb_mPf2, cstr!("SignalScheduler"), rb_cObject); - - // "Wrap" the SignalScheduler struct into a Ruby object - rb_data_typed_object_wrap( - rb_cSignalScheduler, - Box::into_raw(collector) as *mut c_void, - &RBDATA, - ) + let itimerspec = Self::duration_to_itimerspec(&self.configuration.interval); + let err = unsafe { libc::timer_settime(timer, 0, &itimerspec, null_mut()) }; + if err != 0 { + panic!("timer_settime failed: {}", err); } - } - unsafe extern "C" fn dmark(ptr: *mut c_void) { - unsafe { - let collector = ManuallyDrop::new(Box::from_raw(ptr as *mut SignalScheduler)); - if let Some(profile) = &collector.profile { - match profile.read() { - Ok(profile) => { - profile.dmark(); - } - Err(_) => { - panic!("[pf2 FATAL] dmark: Failed to acquire profile lock."); - } - } - } - } + log::debug!("timer registered for thread {}", ruby_thread); } - unsafe extern "C" fn dfree(ptr: *mut c_void) { - unsafe { - drop(Box::from_raw(ptr as *mut SignalScheduler)); - } - } + fn duration_to_itimerspec(duration: &std::time::Duration) -> libc::itimerspec { + let nanos = duration.as_nanos(); + let seconds_part: i64 = (nanos / 1_000_000_000).try_into().unwrap(); + let nanos_part: i64 = (nanos % 1_000_000_000).try_into().unwrap(); - unsafe extern "C" fn dsize(_: *const c_void) -> size_t { - // FIXME: Report something better - mem::size_of::<SignalScheduler>() as size_t + let mut its: libc::itimerspec = unsafe { mem::zeroed() }; + its.it_interval.tv_sec = seconds_part; + its.it_interval.tv_nsec = nanos_part; + its.it_value.tv_sec = seconds_part; + its.it_value.tv_nsec = nanos_part; + its } } - -static mut RBDATA: rb_data_type_t = rb_data_type_t { - wrap_struct_name: cstr!("SignalScheduler"), - function: rb_data_type_struct__bindgen_ty_1 { - dmark: Some(SignalScheduler::dmark), - dfree: Some(SignalScheduler::dfree), - dsize: Some(SignalScheduler::dsize), - dcompact: None, - reserved: [null_mut(); 1], - }, - parent: null_mut(), - data: null_mut(), - flags: 0, -};