ext/pf2/src/timer_thread_scheduler.rs in pf2-0.4.0 vs ext/pf2/src/timer_thread_scheduler.rs in pf2-0.5.0

- old
+ new

@@ -1,187 +1,137 @@ #![deny(unsafe_op_in_unsafe_fn)] -use std::ffi::{c_int, c_void, CStr, CString}; +use std::ffi::{c_void, CString}; use std::mem::ManuallyDrop; -use std::ptr::null_mut; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; -use std::time::Duration; use rb_sys::*; use crate::profile::Profile; use crate::profile_serializer::ProfileSerializer; use crate::sample::Sample; +use crate::scheduler::Scheduler; +use crate::session::configuration::{self, Configuration}; use crate::util::*; #[derive(Clone, Debug)] pub struct TimerThreadScheduler { - ruby_threads: Arc<RwLock<Vec<VALUE>>>, - interval: Option<Arc<Duration>>, - profile: Option<Arc<RwLock<Profile>>>, + configuration: Arc<Configuration>, + profile: Arc<RwLock<Profile>>, stop_requested: Arc<AtomicBool>, } #[derive(Debug)] struct PostponedJobArgs { - ruby_threads: Arc<RwLock<Vec<VALUE>>>, + configuration: Arc<Configuration>, profile: Arc<RwLock<Profile>>, } -impl TimerThreadScheduler { - fn new() -> Self { - TimerThreadScheduler { - ruby_threads: Arc::new(RwLock::new(vec![])), - interval: None, - profile: None, - stop_requested: Arc::new(AtomicBool::new(false)), - } - } - - fn initialize(&mut self, argc: c_int, argv: *const VALUE, _rbself: VALUE) -> VALUE { - // Parse arguments - let kwargs: VALUE = Qnil.into(); - unsafe { - rb_scan_args(argc, argv, cstr!(":"), &kwargs); - }; - let mut kwargs_values: [VALUE; 3] = [Qnil.into(); 3]; - unsafe { - rb_get_kwargs( - kwargs, - [ - rb_intern(cstr!("interval_ms")), - rb_intern(cstr!("threads")), - rb_intern(cstr!("time_mode")), - ] - .as_mut_ptr(), - 0, - 3, - kwargs_values.as_mut_ptr(), - ); - }; - let interval: Duration = if kwargs_values[0] != Qundef as VALUE { - let interval_ms = unsafe { rb_num2long(kwargs_values[0]) }; - Duration::from_millis(interval_ms.try_into().unwrap_or_else(|_| { - eprintln!( - "[Pf2] Warning: Specified interval ({}) is not valid. Using default value (49ms).", - interval_ms - ); - 49 - })) - } else { - Duration::from_millis(49) - }; - let threads: VALUE = if kwargs_values[1] != Qundef as VALUE { - kwargs_values[1] - } else { - unsafe { rb_funcall(rb_cThread, rb_intern(cstr!("list")), 0) } - }; - if kwargs_values[2] != Qundef as VALUE { - let specified_mode = unsafe { - let mut str = rb_funcall(kwargs_values[2], rb_intern(cstr!("to_s")), 0); - let ptr = rb_string_value_ptr(&mut str); - CStr::from_ptr(ptr).to_str().unwrap() - }; - if specified_mode != "wall" { - // Raise an ArgumentError - unsafe { - rb_raise( - rb_eArgError, - cstr!("TimerThreadScheduler only supports :wall mode."), - ) - } - } - } - - let mut target_ruby_threads = Vec::new(); - unsafe { - for i in 0..RARRAY_LEN(threads) { - let ruby_thread: VALUE = rb_ary_entry(threads, i); - target_ruby_threads.push(ruby_thread); - } - } - - self.interval = Some(Arc::new(interval)); - self.ruby_threads = Arc::new(RwLock::new(target_ruby_threads.into_iter().collect())); - - Qnil.into() - } - - fn start(&mut self, _rbself: VALUE) -> VALUE { - // Create Profile - let profile = Arc::new(RwLock::new(Profile::new())); - self.start_profile_buffer_flusher_thread(&profile); - - // Start monitoring thread - let stop_requested = Arc::clone(&self.stop_requested); - let interval = Arc::clone(self.interval.as_ref().unwrap()); +impl Scheduler for TimerThreadScheduler { + fn start(&self) -> VALUE { + // Register the Postponed Job which does the actual work of collecting samples let postponed_job_args: Box<PostponedJobArgs> = Box::new(PostponedJobArgs { - ruby_threads: Arc::clone(&self.ruby_threads), - profile: Arc::clone(&profile), + configuration: Arc::clone(&self.configuration), + profile: Arc::clone(&self.profile), }); let postponed_job_handle: rb_postponed_job_handle_t = unsafe { rb_postponed_job_preregister( 0, Some(Self::postponed_job), Box::into_raw(postponed_job_args) as *mut c_void, // FIXME: leak ) }; + + // Start a timer thread that periodically triggers postponed jobs based on configuration + let configuration = Arc::clone(&self.configuration); + let stop_requested = Arc::clone(&self.stop_requested); thread::spawn(move || { - Self::thread_main_loop(stop_requested, interval, postponed_job_handle) + Self::thread_main_loop(configuration, stop_requested, postponed_job_handle) }); - self.profile = Some(profile); - Qtrue.into() } + fn stop(&self) -> VALUE { + // Stop the collector thread + self.stop_requested.store(true, Ordering::Relaxed); + + // Finalize + match self.profile.try_write() { + Ok(mut profile) => { + profile.flush_temporary_sample_buffer(); + } + Err(_) => { + println!("[pf2 ERROR] stop: Failed to acquire profile lock."); + return Qfalse.into(); + } + } + + let profile = self.profile.try_read().unwrap(); + log::debug!("Number of samples: {}", profile.samples.len()); + + let serialized = ProfileSerializer::serialize(&profile); + let serialized = CString::new(serialized).unwrap(); + unsafe { rb_str_new_cstr(serialized.as_ptr()) } + } + + fn on_new_thread(&self, _thread: VALUE) { + todo!(); + } + + fn dmark(&self) { + match self.profile.read() { + Ok(profile) => unsafe { + profile.dmark(); + }, + Err(_) => { + panic!("[pf2 FATAL] dmark: Failed to acquire profile lock."); + } + } + } + + fn dfree(&self) { + // No-op + } + + fn dsize(&self) -> size_t { + // FIXME: Report something better + std::mem::size_of::<TimerThreadScheduler>() as size_t + } +} + +impl TimerThreadScheduler { + pub fn new(configuration: &Configuration, profile: Arc<RwLock<Profile>>) -> Self { + Self { + configuration: Arc::new(configuration.clone()), + profile, + stop_requested: Arc::new(AtomicBool::new(false)), + } + + // cstr!("TimerThreadScheduler only supports :wall mode."), + } + fn thread_main_loop( + configuration: Arc<Configuration>, stop_requested: Arc<AtomicBool>, - interval: Arc<Duration>, postponed_job_handle: rb_postponed_job_handle_t, ) { loop { if stop_requested.fetch_and(true, Ordering::Relaxed) { break; } unsafe { + log::trace!("Triggering postponed job"); rb_postponed_job_trigger(postponed_job_handle); } - thread::sleep(*interval); + thread::sleep(configuration.interval); } } - fn stop(&self, _rbself: VALUE) -> VALUE { - // Stop the collector thread - self.stop_requested.store(true, Ordering::Relaxed); - - if let Some(profile) = &self.profile { - // Finalize - match profile.try_write() { - Ok(mut profile) => { - profile.flush_temporary_sample_buffer(); - } - Err(_) => { - println!("[pf2 ERROR] stop: Failed to acquire profile lock."); - return Qfalse.into(); - } - } - - let profile = profile.try_read().unwrap(); - log::debug!("Number of samples: {}", profile.samples.len()); - - let serialized = ProfileSerializer::serialize(&profile); - let serialized = CString::new(serialized).unwrap(); - unsafe { rb_str_new_cstr(serialized.as_ptr()) } - } else { - panic!("stop() called before start()"); - } - } - unsafe extern "C" fn postponed_job(ptr: *mut c_void) { unsafe { rb_gc_disable(); } let args = unsafe { ManuallyDrop::new(Box::from_raw(ptr as *mut PostponedJobArgs)) }; @@ -194,126 +144,28 @@ return; } }; // Collect stack information from specified Ruby Threads - let ruby_threads = args.ruby_threads.try_read().unwrap(); - for ruby_thread in ruby_threads.iter() { - // Check if the thread is still alive - if unsafe { rb_funcall(*ruby_thread, rb_intern(cstr!("status")), 0) } == Qfalse as u64 { - continue; - } + match &args.configuration.target_ruby_threads { + configuration::Threads::All => todo!(), + configuration::Threads::Targeted(threads) => { + for ruby_thread in threads.iter() { + // Check if the thread is still alive + if unsafe { rb_funcall(*ruby_thread, rb_intern(cstr!("status")), 0) } + == Qfalse as u64 + { + continue; + } - let sample = Sample::capture(*ruby_thread, &profile.backtrace_state); - if profile.temporary_sample_buffer.push(sample).is_err() { - log::debug!("Temporary sample buffer full. Dropping sample."); - } - } - unsafe { - rb_gc_enable(); - } - } - - fn start_profile_buffer_flusher_thread(&self, profile: &Arc<RwLock<Profile>>) { - let profile = Arc::clone(profile); - thread::spawn(move || loop { - log::trace!("Flushing temporary sample buffer"); - match profile.try_write() { - Ok(mut profile) => { - profile.flush_temporary_sample_buffer(); - } - Err(_) => { - log::debug!("flusher: Failed to acquire profile lock"); - } - } - thread::sleep(Duration::from_millis(500)); - }); - } - - // Ruby Methods - - pub unsafe extern "C" fn rb_initialize( - argc: c_int, - argv: *const VALUE, - rbself: VALUE, - ) -> VALUE { - let mut collector = Self::get_struct_from(rbself); - collector.initialize(argc, argv, rbself) - } - - // SampleCollector.start - pub unsafe extern "C" fn rb_start(rbself: VALUE) -> VALUE { - let mut collector = Self::get_struct_from(rbself); - collector.start(rbself) - } - - // SampleCollector.stop - pub unsafe extern "C" fn rb_stop(rbself: VALUE) -> VALUE { - let collector = Self::get_struct_from(rbself); - collector.stop(rbself) - } - - // Functions for TypedData - - fn get_struct_from(obj: VALUE) -> ManuallyDrop<Box<Self>> { - unsafe { - let ptr = rb_check_typeddata(obj, &RBDATA); - ManuallyDrop::new(Box::from_raw(ptr as *mut TimerThreadScheduler)) - } - } - - #[allow(non_snake_case)] - pub unsafe extern "C" fn rb_alloc(_rbself: VALUE) -> VALUE { - let collector = TimerThreadScheduler::new(); - - unsafe { - let rb_mPf2: VALUE = rb_define_module(cstr!("Pf2")); - let rb_cTimerThreadScheduler = - rb_define_class_under(rb_mPf2, cstr!("TimerThreadScheduler"), rb_cObject); - - rb_data_typed_object_wrap( - rb_cTimerThreadScheduler, - Box::into_raw(Box::new(collector)) as *mut _ as *mut c_void, - &RBDATA, - ) - } - } - - unsafe extern "C" fn dmark(ptr: *mut c_void) { - unsafe { - let collector = ManuallyDrop::new(Box::from_raw(ptr as *mut TimerThreadScheduler)); - if let Some(profile) = &collector.profile { - match profile.read() { - Ok(profile) => { - profile.dmark(); + let sample = Sample::capture(*ruby_thread, &profile.backtrace_state); + if profile.temporary_sample_buffer.push(sample).is_err() { + log::debug!("Temporary sample buffer full. Dropping sample."); } - Err(_) => { - panic!("[pf2 FATAL] dmark: Failed to acquire profile lock."); - } } } } - } - unsafe extern "C" fn dfree(ptr: *mut c_void) { unsafe { - drop(Box::from_raw(ptr as *mut TimerThreadScheduler)); + rb_gc_enable(); } } - unsafe extern "C" fn dsize(_: *const c_void) -> size_t { - // FIXME: Report something better - std::mem::size_of::<TimerThreadScheduler>() as size_t - } } - -static mut RBDATA: rb_data_type_t = rb_data_type_t { - wrap_struct_name: cstr!("TimerThreadScheduler"), - function: rb_data_type_struct__bindgen_ty_1 { - dmark: Some(TimerThreadScheduler::dmark), - dfree: Some(TimerThreadScheduler::dfree), - dsize: Some(TimerThreadScheduler::dsize), - dcompact: None, - reserved: [null_mut(); 1], - }, - parent: null_mut(), - data: null_mut(), - flags: 0, -};