lib/logstasher/active_job/log_subscriber.rb in logstasher-2.0.2 vs lib/logstasher/active_job/log_subscriber.rb in logstasher-2.1.0
- old
+ new
@@ -1,92 +1,98 @@
-begin
- # `rescue nil` didn't work for some Ruby versions
- require 'active_job/logging'
-rescue LoadError
+if ActiveJob::VERSION::MAJOR >= 6 && ActiveJob::VERSION::MINOR >= 1
+ require 'active_job/log_subscriber'
+else
+ require 'active_job/logging'
end
-if defined?(::ActiveJob::Logging::LogSubscriber)
- module LogStasher
- module ActiveJob
- class LogSubscriber < ::ActiveJob::Logging::LogSubscriber
- def enqueue(event)
- process_event(event, 'enqueue')
- end
+module LogStasher
+ module ActiveJob
- def enqueue_at(event)
- process_event(event, 'enqueue_at')
- end
+ BASE_SUBSCRIBER = if defined?(::ActiveJob::LogSubscriber)
+ ::ActiveJob::LogSubscriber
+ else
+ ::ActiveJob::Logging::LogSubscriber
+ end
- def perform(event)
- process_event(event, 'perform')
+ class LogSubscriber < BASE_SUBSCRIBER
+ def enqueue(event)
+ process_event(event, 'enqueue')
+ end
- # Revert the request id back, in the event that the inline adapter is being used or a
- # perform_now was used.
- LogStasher.request_context[:request_id] = Thread.current[:old_request_id]
- Thread.current[:old_request_id] = nil
- end
+ def enqueue_at(event)
+ process_event(event, 'enqueue_at')
+ end
- def perform_start(event)
- # Use the job_id as the request id, so that any custom logging done for a job
- # shares a request id, and has the job id in each log line.
- #
- # It's not being set when the job is enqueued, so enqueuing a job will have it's default
- # request_id. In a lot of cases, it will be because of a web request.
- #
- # Hang onto the old request id, so we can revert after the job is done being performed.
- Thread.current[:old_request_id] = LogStasher.request_context[:request_id]
- LogStasher.request_context[:request_id] = event.payload[:job].job_id
+ def perform(event)
+ process_event(event, 'perform')
- process_event(event, 'perform_start')
- end
+ # Revert the request id back, in the event that the inline adapter is being used or a
+ # perform_now was used.
+ LogStasher.request_context[:request_id] = Thread.current[:old_request_id]
+ Thread.current[:old_request_id] = nil
+ end
- def logger
- LogStasher.logger
- end
+ def perform_start(event)
+ # Use the job_id as the request id, so that any custom logging done for a job
+ # shares a request id, and has the job id in each log line.
+ #
+ # It's not being set when the job is enqueued, so enqueuing a job will have it's default
+ # request_id. In a lot of cases, it will be because of a web request.
+ #
+ # Hang onto the old request id, so we can revert after the job is done being performed.
+ Thread.current[:old_request_id] = LogStasher.request_context[:request_id]
+ LogStasher.request_context[:request_id] = event.payload[:job].job_id
- private
+ process_event(event, 'perform_start')
+ end
- def process_event(event, type)
- data = extract_metadata(event)
- data.merge! extract_exception(event)
- data.merge! extract_scheduled_at(event) if type == 'enqueue_at'
- data.merge! extract_duration(event) if type == 'perform'
- data.merge! request_context
+ def logger
+ LogStasher.logger
+ end
- tags = ['job', type]
- tags.push('exception') if data[:exception]
- logger << LogStasher.build_logstash_event(data, tags).to_json + "\n"
- end
+ private
- def extract_metadata(event)
- {
- job_id: event.payload[:job].job_id,
- queue_name: queue_name(event),
- job_class: event.payload[:job].class.to_s,
- job_args: args_info(event.payload[:job])
- }
- end
+ def process_event(event, type)
+ data = extract_metadata(event)
+ data.merge! extract_exception(event)
+ data.merge! extract_scheduled_at(event) if type == 'enqueue_at'
+ data.merge! extract_duration(event) if type == 'perform'
+ data.merge! request_context
- def extract_duration(event)
- { duration: event.duration.to_f.round(2) }
- end
+ tags = ['job', type]
+ tags.push('exception') if data[:exception]
+ logger << LogStasher.build_logstash_event(data, tags).to_json + "\n"
+ end
- def extract_exception(event)
- event.payload.slice(:exception)
- end
+ def extract_metadata(event)
+ {
+ job_id: event.payload[:job].job_id,
+ queue_name: queue_name(event),
+ job_class: event.payload[:job].class.to_s,
+ job_args: args_info(event.payload[:job])
+ }
+ end
- def extract_scheduled_at(event)
- { scheduled_at: scheduled_at(event) }
- end
+ def extract_duration(event)
+ { duration: event.duration.to_f.round(2) }
+ end
- def request_context
- LogStasher.request_context
- end
+ def extract_exception(event)
+ event.payload.slice(:exception)
+ end
- # The default args_info makes a string. We need objects to turn into JSON.
- def args_info(job)
- job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg }
- end
+ def extract_scheduled_at(event)
+ { scheduled_at: scheduled_at(event) }
end
+
+ def request_context
+ LogStasher.request_context
+ end
+
+ # The default args_info makes a string. We need objects to turn into JSON.
+ def args_info(job)
+ job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg }
+ end
end
end
end
+