ext/pf2/src/timer_thread_scheduler.rs in pf2-0.2.0 vs ext/pf2/src/timer_thread_scheduler.rs in pf2-0.3.0
- old
+ new
@@ -1,8 +1,8 @@
#![deny(unsafe_op_in_unsafe_fn)]
-use std::ffi::{c_void, CString};
+use std::ffi::{c_int, c_void, CStr, CString};
use std::mem::ManuallyDrop;
use std::ptr::null_mut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
@@ -16,10 +16,11 @@
use crate::util::*;
#[derive(Clone, Debug)]
pub struct TimerThreadScheduler {
ruby_threads: Arc<RwLock<Vec<VALUE>>>,
+ interval: Option<Arc<Duration>>,
profile: Option<Arc<RwLock<Profile>>>,
stop_requested: Arc<AtomicBool>,
}
#[derive(Debug)]
@@ -30,30 +31,93 @@
impl TimerThreadScheduler {
fn new() -> Self {
TimerThreadScheduler {
ruby_threads: Arc::new(RwLock::new(vec![])),
+ interval: None,
profile: None,
stop_requested: Arc::new(AtomicBool::new(false)),
}
}
- fn start(&mut self, _rbself: VALUE, ruby_threads: VALUE) -> VALUE {
- // Register threads
- let stored_threads = &mut self.ruby_threads.try_write().unwrap();
+ fn initialize(&mut self, argc: c_int, argv: *const VALUE, _rbself: VALUE) -> VALUE {
+ // Parse arguments
+ let kwargs: VALUE = Qnil.into();
unsafe {
- for i in 0..RARRAY_LEN(ruby_threads) {
- stored_threads.push(rb_ary_entry(ruby_threads, i));
+ rb_scan_args(argc, argv, cstr!(":"), &kwargs);
+ };
+ let mut kwargs_values: [VALUE; 3] = [Qnil.into(); 3];
+ unsafe {
+ rb_get_kwargs(
+ kwargs,
+ [
+ rb_intern(cstr!("interval_ms")),
+ rb_intern(cstr!("threads")),
+ rb_intern(cstr!("time_mode")),
+ ]
+ .as_mut_ptr(),
+ 0,
+ 3,
+ kwargs_values.as_mut_ptr(),
+ );
+ };
+ let interval: Duration = if kwargs_values[0] != Qundef as VALUE {
+ let interval_ms = unsafe { rb_num2long(kwargs_values[0]) };
+ Duration::from_millis(interval_ms.try_into().unwrap_or_else(|_| {
+ eprintln!(
+ "[Pf2] Warning: Specified interval ({}) is not valid. Using default value (49ms).",
+ interval_ms
+ );
+ 49
+ }))
+ } else {
+ Duration::from_millis(49)
+ };
+ let threads: VALUE = if kwargs_values[1] != Qundef as VALUE {
+ kwargs_values[1]
+ } else {
+ unsafe { rb_funcall(rb_cThread, rb_intern(cstr!("list")), 0) }
+ };
+ if kwargs_values[2] != Qundef as VALUE {
+ let specified_mode = unsafe {
+ let mut str = rb_funcall(kwargs_values[2], rb_intern(cstr!("to_s")), 0);
+ let ptr = rb_string_value_ptr(&mut str);
+ CStr::from_ptr(ptr).to_str().unwrap()
+ };
+ if specified_mode != "wall" {
+ // Raise an ArgumentError
+ unsafe {
+ rb_raise(
+ rb_eArgError,
+ cstr!("TimerThreadScheduler only supports :wall mode."),
+ )
+ }
}
}
+ let mut target_ruby_threads = Vec::new();
+ unsafe {
+ for i in 0..RARRAY_LEN(threads) {
+ let ruby_thread: VALUE = rb_ary_entry(threads, i);
+ target_ruby_threads.push(ruby_thread);
+ }
+ }
+
+ self.interval = Some(Arc::new(interval));
+ self.ruby_threads = Arc::new(RwLock::new(target_ruby_threads.into_iter().collect()));
+
+ Qnil.into()
+ }
+
+ fn start(&mut self, _rbself: VALUE) -> VALUE {
// Create Profile
let profile = Arc::new(RwLock::new(Profile::new()));
self.start_profile_buffer_flusher_thread(&profile);
// Start monitoring thread
let stop_requested = Arc::clone(&self.stop_requested);
+ let interval = Arc::clone(self.interval.as_ref().unwrap());
let postponed_job_args: Box<PostponedJobArgs> = Box::new(PostponedJobArgs {
ruby_threads: Arc::clone(&self.ruby_threads),
profile: Arc::clone(&profile),
});
let postponed_job_handle: rb_postponed_job_handle_t = unsafe {
@@ -61,30 +125,33 @@
0,
Some(Self::postponed_job),
Box::into_raw(postponed_job_args) as *mut c_void, // FIXME: leak
)
};
- thread::spawn(move || Self::thread_main_loop(stop_requested, postponed_job_handle));
+ thread::spawn(move || {
+ Self::thread_main_loop(stop_requested, interval, postponed_job_handle)
+ });
self.profile = Some(profile);
Qtrue.into()
}
fn thread_main_loop(
stop_requested: Arc<AtomicBool>,
+ interval: Arc<Duration>,
postponed_job_handle: rb_postponed_job_handle_t,
) {
loop {
if stop_requested.fetch_and(true, Ordering::Relaxed) {
break;
}
unsafe {
rb_postponed_job_trigger(postponed_job_handle);
}
- // sleep for 50 ms
- thread::sleep(Duration::from_millis(50));
+
+ thread::sleep(*interval);
}
}
fn stop(&self, _rbself: VALUE) -> VALUE {
// Stop the collector thread
@@ -134,11 +201,11 @@
// Check if the thread is still alive
if unsafe { rb_funcall(*ruby_thread, rb_intern(cstr!("status")), 0) } == Qfalse as u64 {
continue;
}
- let sample = Sample::capture(*ruby_thread);
+ let sample = Sample::capture(*ruby_thread, &profile.backtrace_state);
if profile.temporary_sample_buffer.push(sample).is_err() {
log::debug!("Temporary sample buffer full. Dropping sample.");
}
}
unsafe {
@@ -162,13 +229,22 @@
});
}
// Ruby Methods
+ pub unsafe extern "C" fn rb_initialize(
+ argc: c_int,
+ argv: *const VALUE,
+ rbself: VALUE,
+ ) -> VALUE {
+ let mut collector = Self::get_struct_from(rbself);
+ collector.initialize(argc, argv, rbself)
+ }
+
// SampleCollector.start
- pub unsafe extern "C" fn rb_start(rbself: VALUE, ruby_threads: VALUE, _: VALUE) -> VALUE {
+ pub unsafe extern "C" fn rb_start(rbself: VALUE) -> VALUE {
let mut collector = Self::get_struct_from(rbself);
- collector.start(rbself, ruby_threads)
+ collector.start(rbself)
}
// SampleCollector.stop
pub unsafe extern "C" fn rb_stop(rbself: VALUE) -> VALUE {
let collector = Self::get_struct_from(rbself);