ext/pf2/src/signal_scheduler.rs in pf2-0.4.0 vs ext/pf2/src/signal_scheduler.rs in pf2-0.5.0
- old
+ new
@@ -1,166 +1,98 @@
#![deny(unsafe_op_in_unsafe_fn)]
-mod configuration;
-mod timer_installer;
-
-use self::configuration::{Configuration, TimeMode};
-use self::timer_installer::TimerInstaller;
use crate::profile::Profile;
use crate::profile_serializer::ProfileSerializer;
+use crate::ruby_internal_apis::rb_thread_getcpuclockid;
use crate::sample::Sample;
+use crate::scheduler::Scheduler;
+use crate::session::configuration::{self, Configuration};
use core::panic;
-use std::collections::HashSet;
-use std::ffi::{c_int, c_void, CStr, CString};
+use std::ffi::{c_int, c_void, CString};
use std::mem::ManuallyDrop;
-use std::str::FromStr;
use std::sync::{Arc, RwLock};
-use std::thread;
-use std::time::Duration;
use std::{mem, ptr::null_mut};
use rb_sys::*;
use crate::util::*;
#[derive(Debug)]
pub struct SignalScheduler {
- configuration: Option<configuration::Configuration>,
- profile: Option<Arc<RwLock<Profile>>>,
+ configuration: Configuration,
+ profile: Arc<RwLock<Profile>>,
}
pub struct SignalHandlerArgs {
profile: Arc<RwLock<Profile>>,
context_ruby_thread: VALUE,
}
-impl SignalScheduler {
- fn new() -> Self {
- Self {
- configuration: None,
- profile: None,
+impl Scheduler for SignalScheduler {
+ fn start(&self) -> VALUE {
+ self.install_signal_handler();
+
+ if let configuration::Threads::Targeted(threads) = &self.configuration.target_ruby_threads {
+ for ruby_thread in threads.iter() {
+ self.install_timer_to_ruby_thread(*ruby_thread);
+ }
}
+
+ Qtrue.into()
}
- fn initialize(&mut self, argc: c_int, argv: *const VALUE, _rbself: VALUE) -> VALUE {
- // Parse arguments
- let kwargs: VALUE = Qnil.into();
- unsafe {
- rb_scan_args(argc, argv, cstr!(":"), &kwargs);
- };
- let mut kwargs_values: [VALUE; 4] = [Qnil.into(); 4];
- unsafe {
- rb_get_kwargs(
- kwargs,
- [
- rb_intern(cstr!("interval_ms")),
- rb_intern(cstr!("threads")),
- rb_intern(cstr!("time_mode")),
- rb_intern(cstr!("track_all_threads")),
- ]
- .as_mut_ptr(),
- 0,
- 4,
- kwargs_values.as_mut_ptr(),
- );
- };
- let interval: Duration = if kwargs_values[0] != Qundef as VALUE {
- let interval_ms = unsafe { rb_num2long(kwargs_values[0]) };
- Duration::from_millis(interval_ms.try_into().unwrap_or_else(|_| {
- eprintln!(
- "[Pf2] Warning: Specified interval ({}) is not valid. Using default value (49ms).",
- interval_ms
- );
- 49
- }))
- } else {
- Duration::from_millis(49)
- };
- let threads: VALUE = if kwargs_values[1] != Qundef as VALUE {
- kwargs_values[1]
- } else {
- unsafe { rb_funcall(rb_cThread, rb_intern(cstr!("list")), 0) }
- };
- let time_mode: configuration::TimeMode = if kwargs_values[2] != Qundef as VALUE {
- let specified_mode = unsafe {
- let mut str = rb_funcall(kwargs_values[2], rb_intern(cstr!("to_s")), 0);
- let ptr = rb_string_value_ptr(&mut str);
- CStr::from_ptr(ptr).to_str().unwrap()
- };
- configuration::TimeMode::from_str(specified_mode).unwrap_or_else(|_| {
- // Raise an ArgumentError
- unsafe {
- rb_raise(
- rb_eArgError,
- cstr!("Invalid time mode. Valid values are 'cpu' and 'wall'."),
- )
- }
- })
- } else {
- configuration::TimeMode::CpuTime
- };
- let track_all_threads: bool = if kwargs_values[3] != Qundef as VALUE {
- RTEST(kwargs_values[3])
- } else {
- false
- };
-
- let mut target_ruby_threads = HashSet::new();
- unsafe {
- for i in 0..RARRAY_LEN(threads) {
- let ruby_thread: VALUE = rb_ary_entry(threads, i);
- target_ruby_threads.insert(ruby_thread);
+ fn stop(&self) -> VALUE {
+ // Finalize
+ match self.profile.try_write() {
+ Ok(mut profile) => {
+ profile.flush_temporary_sample_buffer();
}
+ Err(_) => {
+ println!("[pf2 ERROR] stop: Failed to acquire profile lock.");
+ return Qfalse.into();
+ }
}
- self.configuration = Some(Configuration {
- interval,
- target_ruby_threads,
- time_mode,
- track_all_threads,
- });
+ let profile = self.profile.try_read().unwrap();
+ log::debug!("Number of samples: {}", profile.samples.len());
- Qnil.into()
+ let serialized = ProfileSerializer::serialize(&profile);
+ let serialized = CString::new(serialized).unwrap();
+ unsafe { rb_str_new_cstr(serialized.as_ptr()) }
}
- fn start(&mut self, _rbself: VALUE) -> VALUE {
- let profile = Arc::new(RwLock::new(Profile::new()));
- self.start_profile_buffer_flusher_thread(&profile);
- self.install_signal_handler();
-
- TimerInstaller::install_timer_to_ruby_threads(
- self.configuration.as_ref().unwrap().clone(), // FIXME: don't clone
- Arc::clone(&profile),
- );
-
- self.profile = Some(profile);
-
- Qtrue.into()
+ fn on_new_thread(&self, thread: VALUE) {
+ self.install_timer_to_ruby_thread(thread);
}
- fn stop(&mut self, _rbself: VALUE) -> VALUE {
- if let Some(profile) = &self.profile {
- // Finalize
- match profile.try_write() {
- Ok(mut profile) => {
- profile.flush_temporary_sample_buffer();
- }
- Err(_) => {
- println!("[pf2 ERROR] stop: Failed to acquire profile lock.");
- return Qfalse.into();
- }
+ fn dmark(&self) {
+ match self.profile.read() {
+ Ok(profile) => unsafe {
+ profile.dmark();
+ },
+ Err(_) => {
+ panic!("[pf2 FATAL] dmark: Failed to acquire profile lock.");
}
+ }
+ }
- let profile = profile.try_read().unwrap();
- log::debug!("Number of samples: {}", profile.samples.len());
+ fn dfree(&self) {
+ // No-op
+ }
- let serialized = ProfileSerializer::serialize(&profile);
- let serialized = CString::new(serialized).unwrap();
- unsafe { rb_str_new_cstr(serialized.as_ptr()) }
- } else {
- panic!("stop() called before start()");
+ fn dsize(&self) -> size_t {
+ // FIXME: Report something better
+ mem::size_of::<Self>() as size_t
+ }
+}
+
+impl SignalScheduler {
+ pub fn new(configuration: &Configuration, profile: Arc<RwLock<Profile>>) -> Self {
+ Self {
+ configuration: configuration.clone(),
+ profile,
}
}
// Install signal handler for profiling events to the current process.
fn install_signal_handler(&self) {
@@ -200,112 +132,64 @@
if profile.temporary_sample_buffer.push(sample).is_err() {
log::debug!("Temporary sample buffer full. Dropping sample.");
}
}
- fn start_profile_buffer_flusher_thread(&self, profile: &Arc<RwLock<Profile>>) {
- let profile = Arc::clone(profile);
- thread::spawn(move || loop {
- log::trace!("Flushing temporary sample buffer");
- match profile.try_write() {
- Ok(mut profile) => {
- profile.flush_temporary_sample_buffer();
- }
- Err(_) => {
- log::debug!("flusher: Failed to acquire profile lock");
- }
- }
- thread::sleep(Duration::from_millis(500));
+ fn install_timer_to_ruby_thread(&self, ruby_thread: VALUE) {
+ // NOTE: This Box never gets dropped
+ let signal_handler_args = Box::new(SignalHandlerArgs {
+ profile: Arc::clone(&self.profile),
+ context_ruby_thread: ruby_thread,
});
- }
- // Ruby Methods
+ // rb_funcall deadlocks when called within a THREAD_EVENT_STARTED hook
+ let kernel_thread_id: i32 = i32::try_from(unsafe {
+ rb_num2int(rb_funcall(
+ ruby_thread,
+ rb_intern(cstr!("native_thread_id")), // kernel thread ID
+ 0,
+ ))
+ })
+ .unwrap();
- pub unsafe extern "C" fn rb_initialize(
- argc: c_int,
- argv: *const VALUE,
- rbself: VALUE,
- ) -> VALUE {
- let mut collector = unsafe { Self::get_struct_from(rbself) };
- collector.initialize(argc, argv, rbself)
- }
+ // Create a signal event
+ let mut sigevent: libc::sigevent = unsafe { mem::zeroed() };
+ // Note: SIGEV_THREAD_ID is Linux-specific. In other platforms, we would need to
+ // "trampoline" the signal as any pthread can receive the signal.
+ sigevent.sigev_notify = libc::SIGEV_THREAD_ID;
+ sigevent.sigev_notify_thread_id = kernel_thread_id;
+ sigevent.sigev_signo = libc::SIGALRM;
+ // Pass required args to the signal handler
+ sigevent.sigev_value.sival_ptr = Box::into_raw(signal_handler_args) as *mut c_void;
- pub unsafe extern "C" fn rb_start(rbself: VALUE) -> VALUE {
- let mut collector = unsafe { Self::get_struct_from(rbself) };
- collector.start(rbself)
- }
-
- pub unsafe extern "C" fn rb_stop(rbself: VALUE) -> VALUE {
- let mut collector = unsafe { Self::get_struct_from(rbself) };
- collector.stop(rbself)
- }
-
- // Functions for TypedData
-
- // Extract the SignalScheduler struct from a Ruby object
- unsafe fn get_struct_from(obj: VALUE) -> ManuallyDrop<Box<Self>> {
- unsafe {
- let ptr = rb_check_typeddata(obj, &RBDATA);
- ManuallyDrop::new(Box::from_raw(ptr as *mut SignalScheduler))
+ // Create and configure timer to fire every _interval_ ms of CPU time
+ let mut timer: libc::timer_t = unsafe { mem::zeroed() };
+ let clockid = match self.configuration.time_mode {
+ configuration::TimeMode::CpuTime => unsafe { rb_thread_getcpuclockid(ruby_thread) },
+ configuration::TimeMode::WallTime => libc::CLOCK_MONOTONIC,
+ };
+ let err = unsafe { libc::timer_create(clockid, &mut sigevent, &mut timer) };
+ if err != 0 {
+ panic!("timer_create failed: {}", err);
}
- }
-
- #[allow(non_snake_case)]
- pub unsafe extern "C" fn rb_alloc(_rbself: VALUE) -> VALUE {
- let collector = Box::new(SignalScheduler::new());
- unsafe { Arc::increment_strong_count(&collector) };
-
- unsafe {
- let rb_mPf2: VALUE = rb_define_module(cstr!("Pf2"));
- let rb_cSignalScheduler =
- rb_define_class_under(rb_mPf2, cstr!("SignalScheduler"), rb_cObject);
-
- // "Wrap" the SignalScheduler struct into a Ruby object
- rb_data_typed_object_wrap(
- rb_cSignalScheduler,
- Box::into_raw(collector) as *mut c_void,
- &RBDATA,
- )
+ let itimerspec = Self::duration_to_itimerspec(&self.configuration.interval);
+ let err = unsafe { libc::timer_settime(timer, 0, &itimerspec, null_mut()) };
+ if err != 0 {
+ panic!("timer_settime failed: {}", err);
}
- }
- unsafe extern "C" fn dmark(ptr: *mut c_void) {
- unsafe {
- let collector = ManuallyDrop::new(Box::from_raw(ptr as *mut SignalScheduler));
- if let Some(profile) = &collector.profile {
- match profile.read() {
- Ok(profile) => {
- profile.dmark();
- }
- Err(_) => {
- panic!("[pf2 FATAL] dmark: Failed to acquire profile lock.");
- }
- }
- }
- }
+ log::debug!("timer registered for thread {}", ruby_thread);
}
- unsafe extern "C" fn dfree(ptr: *mut c_void) {
- unsafe {
- drop(Box::from_raw(ptr as *mut SignalScheduler));
- }
- }
+ fn duration_to_itimerspec(duration: &std::time::Duration) -> libc::itimerspec {
+ let nanos = duration.as_nanos();
+ let seconds_part: i64 = (nanos / 1_000_000_000).try_into().unwrap();
+ let nanos_part: i64 = (nanos % 1_000_000_000).try_into().unwrap();
- unsafe extern "C" fn dsize(_: *const c_void) -> size_t {
- // FIXME: Report something better
- mem::size_of::<SignalScheduler>() as size_t
+ let mut its: libc::itimerspec = unsafe { mem::zeroed() };
+ its.it_interval.tv_sec = seconds_part;
+ its.it_interval.tv_nsec = nanos_part;
+ its.it_value.tv_sec = seconds_part;
+ its.it_value.tv_nsec = nanos_part;
+ its
}
}
-
-static mut RBDATA: rb_data_type_t = rb_data_type_t {
- wrap_struct_name: cstr!("SignalScheduler"),
- function: rb_data_type_struct__bindgen_ty_1 {
- dmark: Some(SignalScheduler::dmark),
- dfree: Some(SignalScheduler::dfree),
- dsize: Some(SignalScheduler::dsize),
- dcompact: None,
- reserved: [null_mut(); 1],
- },
- parent: null_mut(),
- data: null_mut(),
- flags: 0,
-};