lib/logstasher/active_job/log_subscriber.rb in logstasher-2.0.2 vs lib/logstasher/active_job/log_subscriber.rb in logstasher-2.1.0

- old
+ new

@@ -1,92 +1,98 @@ -begin - # `rescue nil` didn't work for some Ruby versions - require 'active_job/logging' -rescue LoadError +if ActiveJob::VERSION::MAJOR >= 6 && ActiveJob::VERSION::MINOR >= 1 + require 'active_job/log_subscriber' +else + require 'active_job/logging' end -if defined?(::ActiveJob::Logging::LogSubscriber) - module LogStasher - module ActiveJob - class LogSubscriber < ::ActiveJob::Logging::LogSubscriber - def enqueue(event) - process_event(event, 'enqueue') - end +module LogStasher + module ActiveJob - def enqueue_at(event) - process_event(event, 'enqueue_at') - end + BASE_SUBSCRIBER = if defined?(::ActiveJob::LogSubscriber) + ::ActiveJob::LogSubscriber + else + ::ActiveJob::Logging::LogSubscriber + end - def perform(event) - process_event(event, 'perform') + class LogSubscriber < BASE_SUBSCRIBER + def enqueue(event) + process_event(event, 'enqueue') + end - # Revert the request id back, in the event that the inline adapter is being used or a - # perform_now was used. - LogStasher.request_context[:request_id] = Thread.current[:old_request_id] - Thread.current[:old_request_id] = nil - end + def enqueue_at(event) + process_event(event, 'enqueue_at') + end - def perform_start(event) - # Use the job_id as the request id, so that any custom logging done for a job - # shares a request id, and has the job id in each log line. - # - # It's not being set when the job is enqueued, so enqueuing a job will have it's default - # request_id. In a lot of cases, it will be because of a web request. - # - # Hang onto the old request id, so we can revert after the job is done being performed. - Thread.current[:old_request_id] = LogStasher.request_context[:request_id] - LogStasher.request_context[:request_id] = event.payload[:job].job_id + def perform(event) + process_event(event, 'perform') - process_event(event, 'perform_start') - end + # Revert the request id back, in the event that the inline adapter is being used or a + # perform_now was used. + LogStasher.request_context[:request_id] = Thread.current[:old_request_id] + Thread.current[:old_request_id] = nil + end - def logger - LogStasher.logger - end + def perform_start(event) + # Use the job_id as the request id, so that any custom logging done for a job + # shares a request id, and has the job id in each log line. + # + # It's not being set when the job is enqueued, so enqueuing a job will have it's default + # request_id. In a lot of cases, it will be because of a web request. + # + # Hang onto the old request id, so we can revert after the job is done being performed. + Thread.current[:old_request_id] = LogStasher.request_context[:request_id] + LogStasher.request_context[:request_id] = event.payload[:job].job_id - private + process_event(event, 'perform_start') + end - def process_event(event, type) - data = extract_metadata(event) - data.merge! extract_exception(event) - data.merge! extract_scheduled_at(event) if type == 'enqueue_at' - data.merge! extract_duration(event) if type == 'perform' - data.merge! request_context + def logger + LogStasher.logger + end - tags = ['job', type] - tags.push('exception') if data[:exception] - logger << LogStasher.build_logstash_event(data, tags).to_json + "\n" - end + private - def extract_metadata(event) - { - job_id: event.payload[:job].job_id, - queue_name: queue_name(event), - job_class: event.payload[:job].class.to_s, - job_args: args_info(event.payload[:job]) - } - end + def process_event(event, type) + data = extract_metadata(event) + data.merge! extract_exception(event) + data.merge! extract_scheduled_at(event) if type == 'enqueue_at' + data.merge! extract_duration(event) if type == 'perform' + data.merge! request_context - def extract_duration(event) - { duration: event.duration.to_f.round(2) } - end + tags = ['job', type] + tags.push('exception') if data[:exception] + logger << LogStasher.build_logstash_event(data, tags).to_json + "\n" + end - def extract_exception(event) - event.payload.slice(:exception) - end + def extract_metadata(event) + { + job_id: event.payload[:job].job_id, + queue_name: queue_name(event), + job_class: event.payload[:job].class.to_s, + job_args: args_info(event.payload[:job]) + } + end - def extract_scheduled_at(event) - { scheduled_at: scheduled_at(event) } - end + def extract_duration(event) + { duration: event.duration.to_f.round(2) } + end - def request_context - LogStasher.request_context - end + def extract_exception(event) + event.payload.slice(:exception) + end - # The default args_info makes a string. We need objects to turn into JSON. - def args_info(job) - job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg } - end + def extract_scheduled_at(event) + { scheduled_at: scheduled_at(event) } end + + def request_context + LogStasher.request_context + end + + # The default args_info makes a string. We need objects to turn into JSON. + def args_info(job) + job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg } + end end end end +