ext/pf2/src/timer_thread_scheduler.rs in pf2-0.2.0 vs ext/pf2/src/timer_thread_scheduler.rs in pf2-0.3.0

- old
+ new

@@ -1,8 +1,8 @@ #![deny(unsafe_op_in_unsafe_fn)] -use std::ffi::{c_void, CString}; +use std::ffi::{c_int, c_void, CStr, CString}; use std::mem::ManuallyDrop; use std::ptr::null_mut; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; @@ -16,10 +16,11 @@ use crate::util::*; #[derive(Clone, Debug)] pub struct TimerThreadScheduler { ruby_threads: Arc<RwLock<Vec<VALUE>>>, + interval: Option<Arc<Duration>>, profile: Option<Arc<RwLock<Profile>>>, stop_requested: Arc<AtomicBool>, } #[derive(Debug)] @@ -30,30 +31,93 @@ impl TimerThreadScheduler { fn new() -> Self { TimerThreadScheduler { ruby_threads: Arc::new(RwLock::new(vec![])), + interval: None, profile: None, stop_requested: Arc::new(AtomicBool::new(false)), } } - fn start(&mut self, _rbself: VALUE, ruby_threads: VALUE) -> VALUE { - // Register threads - let stored_threads = &mut self.ruby_threads.try_write().unwrap(); + fn initialize(&mut self, argc: c_int, argv: *const VALUE, _rbself: VALUE) -> VALUE { + // Parse arguments + let kwargs: VALUE = Qnil.into(); unsafe { - for i in 0..RARRAY_LEN(ruby_threads) { - stored_threads.push(rb_ary_entry(ruby_threads, i)); + rb_scan_args(argc, argv, cstr!(":"), &kwargs); + }; + let mut kwargs_values: [VALUE; 3] = [Qnil.into(); 3]; + unsafe { + rb_get_kwargs( + kwargs, + [ + rb_intern(cstr!("interval_ms")), + rb_intern(cstr!("threads")), + rb_intern(cstr!("time_mode")), + ] + .as_mut_ptr(), + 0, + 3, + kwargs_values.as_mut_ptr(), + ); + }; + let interval: Duration = if kwargs_values[0] != Qundef as VALUE { + let interval_ms = unsafe { rb_num2long(kwargs_values[0]) }; + Duration::from_millis(interval_ms.try_into().unwrap_or_else(|_| { + eprintln!( + "[Pf2] Warning: Specified interval ({}) is not valid. Using default value (49ms).", + interval_ms + ); + 49 + })) + } else { + Duration::from_millis(49) + }; + let threads: VALUE = if kwargs_values[1] != Qundef as VALUE { + kwargs_values[1] + } else { + unsafe { rb_funcall(rb_cThread, rb_intern(cstr!("list")), 0) } + }; + if kwargs_values[2] != Qundef as VALUE { + let specified_mode = unsafe { + let mut str = rb_funcall(kwargs_values[2], rb_intern(cstr!("to_s")), 0); + let ptr = rb_string_value_ptr(&mut str); + CStr::from_ptr(ptr).to_str().unwrap() + }; + if specified_mode != "wall" { + // Raise an ArgumentError + unsafe { + rb_raise( + rb_eArgError, + cstr!("TimerThreadScheduler only supports :wall mode."), + ) + } } } + let mut target_ruby_threads = Vec::new(); + unsafe { + for i in 0..RARRAY_LEN(threads) { + let ruby_thread: VALUE = rb_ary_entry(threads, i); + target_ruby_threads.push(ruby_thread); + } + } + + self.interval = Some(Arc::new(interval)); + self.ruby_threads = Arc::new(RwLock::new(target_ruby_threads.into_iter().collect())); + + Qnil.into() + } + + fn start(&mut self, _rbself: VALUE) -> VALUE { // Create Profile let profile = Arc::new(RwLock::new(Profile::new())); self.start_profile_buffer_flusher_thread(&profile); // Start monitoring thread let stop_requested = Arc::clone(&self.stop_requested); + let interval = Arc::clone(self.interval.as_ref().unwrap()); let postponed_job_args: Box<PostponedJobArgs> = Box::new(PostponedJobArgs { ruby_threads: Arc::clone(&self.ruby_threads), profile: Arc::clone(&profile), }); let postponed_job_handle: rb_postponed_job_handle_t = unsafe { @@ -61,30 +125,33 @@ 0, Some(Self::postponed_job), Box::into_raw(postponed_job_args) as *mut c_void, // FIXME: leak ) }; - thread::spawn(move || Self::thread_main_loop(stop_requested, postponed_job_handle)); + thread::spawn(move || { + Self::thread_main_loop(stop_requested, interval, postponed_job_handle) + }); self.profile = Some(profile); Qtrue.into() } fn thread_main_loop( stop_requested: Arc<AtomicBool>, + interval: Arc<Duration>, postponed_job_handle: rb_postponed_job_handle_t, ) { loop { if stop_requested.fetch_and(true, Ordering::Relaxed) { break; } unsafe { rb_postponed_job_trigger(postponed_job_handle); } - // sleep for 50 ms - thread::sleep(Duration::from_millis(50)); + + thread::sleep(*interval); } } fn stop(&self, _rbself: VALUE) -> VALUE { // Stop the collector thread @@ -134,11 +201,11 @@ // Check if the thread is still alive if unsafe { rb_funcall(*ruby_thread, rb_intern(cstr!("status")), 0) } == Qfalse as u64 { continue; } - let sample = Sample::capture(*ruby_thread); + let sample = Sample::capture(*ruby_thread, &profile.backtrace_state); if profile.temporary_sample_buffer.push(sample).is_err() { log::debug!("Temporary sample buffer full. Dropping sample."); } } unsafe { @@ -162,13 +229,22 @@ }); } // Ruby Methods + pub unsafe extern "C" fn rb_initialize( + argc: c_int, + argv: *const VALUE, + rbself: VALUE, + ) -> VALUE { + let mut collector = Self::get_struct_from(rbself); + collector.initialize(argc, argv, rbself) + } + // SampleCollector.start - pub unsafe extern "C" fn rb_start(rbself: VALUE, ruby_threads: VALUE, _: VALUE) -> VALUE { + pub unsafe extern "C" fn rb_start(rbself: VALUE) -> VALUE { let mut collector = Self::get_struct_from(rbself); - collector.start(rbself, ruby_threads) + collector.start(rbself) } // SampleCollector.stop pub unsafe extern "C" fn rb_stop(rbself: VALUE) -> VALUE { let collector = Self::get_struct_from(rbself);