lib/logstasher/active_job/log_subscriber.rb in logstasher-1.4.0 vs lib/logstasher/active_job/log_subscriber.rb in logstasher-2.0.0
- old
+ new
@@ -2,91 +2,91 @@
# `rescue nil` didn't work for some Ruby versions
require 'active_job/logging'
rescue LoadError
end
-module LogStasher
- module ActiveJob
- class LogSubscriber < ::ActiveJob::Logging::LogSubscriber
+if defined?(::ActiveJob::Logging::LogSubscriber)
+ module LogStasher
+ module ActiveJob
+ class LogSubscriber < ::ActiveJob::Logging::LogSubscriber
+ def enqueue(event)
+ process_event(event, 'enqueue')
+ end
- def enqueue(event)
- process_event(event, 'enqueue')
- end
+ def enqueue_at(event)
+ process_event(event, 'enqueue_at')
+ end
- def enqueue_at(event)
- process_event(event, 'enqueue_at')
- end
+ def perform(event)
+ process_event(event, 'perform')
- def perform(event)
- process_event(event, 'perform')
+ # Revert the request id back, in the event that the inline adapter is being used or a
+ # perform_now was used.
+ LogStasher.request_context[:request_id] = Thread.current[:old_request_id]
+ Thread.current[:old_request_id] = nil
+ end
- # Revert the request id back, in the event that the inline adapter is being used or a
- # perform_now was used.
- LogStasher.request_context[:request_id] = Thread.current[:old_request_id]
- Thread.current[:old_request_id] = nil
- end
+ def perform_start(event)
+ # Use the job_id as the request id, so that any custom logging done for a job
+ # shares a request id, and has the job id in each log line.
+ #
+ # It's not being set when the job is enqueued, so enqueuing a job will have it's default
+ # request_id. In a lot of cases, it will be because of a web request.
+ #
+ # Hang onto the old request id, so we can revert after the job is done being performed.
+ Thread.current[:old_request_id] = LogStasher.request_context[:request_id]
+ LogStasher.request_context[:request_id] = event.payload[:job].job_id
- def perform_start(event)
- # Use the job_id as the request id, so that any custom logging done for a job
- # shares a request id, and has the job id in each log line.
- #
- # It's not being set when the job is enqueued, so enqueuing a job will have it's default
- # request_id. In a lot of cases, it will be because of a web request.
- #
- # Hang onto the old request id, so we can revert after the job is done being performed.
- Thread.current[:old_request_id] = LogStasher.request_context[:request_id]
- LogStasher.request_context[:request_id] = event.payload[:job].job_id
+ process_event(event, 'perform_start')
+ end
- process_event(event, 'perform_start')
- end
+ def logger
+ LogStasher.logger
+ end
- def logger
- LogStasher.logger
- end
+ private
- private
+ def process_event(event, type)
+ data = extract_metadata(event)
+ data.merge! extract_exception(event)
+ data.merge! extract_scheduled_at(event) if type == 'enqueue_at'
+ data.merge! extract_duration(event) if type == 'perform'
+ data.merge! request_context
- def process_event(event, type)
- data = extract_metadata(event)
- data.merge! extract_exception(event)
- data.merge! extract_scheduled_at(event) if type == 'enqueue_at'
- data.merge! extract_duration(event) if type == 'perform'
- data.merge! request_context
+ tags = ['job', type]
+ tags.push('exception') if data[:exception]
+ logger << LogStasher.build_logstash_event(data, tags).to_json + "\n"
+ end
- tags = ['job', type]
- tags.push('exception') if data[:exception]
- logger << LogStasher.build_logstash_event(data, tags).to_json + "\n"
- end
+ def extract_metadata(event)
+ {
+ job_id: event.payload[:job].job_id,
+ queue_name: queue_name(event),
+ job_class: event.payload[:job].class.to_s,
+ job_args: args_info(event.payload[:job])
+ }
+ end
- def extract_metadata(event)
- {
- :job_id => event.payload[:job].job_id,
- :queue_name => queue_name(event),
- :job_class => event.payload[:job].class.to_s,
- :job_args => args_info(event.payload[:job]),
- }
- end
+ def extract_duration(event)
+ { duration: event.duration.to_f.round(2) }
+ end
- def extract_duration(event)
- { :duration => event.duration.to_f.round(2) }
- end
+ def extract_exception(event)
+ event.payload.slice(:exception)
+ end
- def extract_exception(event)
- event.payload.slice(:exception)
- end
+ def extract_scheduled_at(event)
+ { scheduled_at: scheduled_at(event) }
+ end
- def extract_scheduled_at(event)
- { :scheduled_at => scheduled_at(event) }
- end
+ def request_context
+ LogStasher.request_context
+ end
- def request_context
- LogStasher.request_context
+ # The default args_info makes a string. We need objects to turn into JSON.
+ def args_info(job)
+ job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg }
+ end
end
-
- # The default args_info makes a string. We need objects to turn into JSON.
- def args_info(job)
- job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg }
- end
-
end
end
-end if defined?(::ActiveJob::Logging::LogSubscriber)
+end