lib/opentelemetry/instrumentation/que/patches/que_job.rb in opentelemetry-instrumentation-que-0.5.0 vs lib/opentelemetry/instrumentation/que/patches/que_job.rb in opentelemetry-instrumentation-que-0.5.1

- old
+ new

@@ -41,30 +41,42 @@ if otel_config[:propagation_style] != :none tags ||= [] OpenTelemetry.propagation.inject(tags, setter: TagSetter) end - job_options = job_options.merge(tags: tags) - job = super(*args, job_options: job_options, **arg_opts) + # In Que version 2.1.0 `bulk_enqueue` was introduced and in order + # for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue. + if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] + Thread.current[:que_jobs_to_bulk_insert][:job_options] = job_options.merge(tags: tags) + job = super(*args, **arg_opts) + job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last + else + job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) + job_attrs = job.que_attrs + end - span.name = "#{job.que_attrs[:job_class]} send" - span.add_attributes(QueJob.job_attributes(job)) + span.name = "#{job_attrs[:job_class]} send" + span.add_attributes(QueJob.job_attributes(job_attrs)) job end end + + def gem_version + @gem_version ||= Gem.loaded_specs['que'].version + end end - def self.job_attributes(job) + def self.job_attributes(job_attrs) attributes = { 'messaging.system' => 'que', 'messaging.destination_kind' => 'queue', 'messaging.operation' => 'send', - 'messaging.destination' => job.que_attrs[:queue] || 'default', - 'messaging.que.job_class' => job.que_attrs[:job_class], - 'messaging.que.priority' => job.que_attrs[:priority] || 100 + 'messaging.destination' => job_attrs[:queue] || 'default', + 'messaging.que.job_class' => job_attrs[:job_class], + 'messaging.que.priority' => job_attrs[:priority] || 100 } - attributes['messaging.message_id'] = job.que_attrs[:id] if job.que_attrs[:id] + attributes['messaging.message_id'] = job_attrs[:id] if job_attrs[:id] attributes end end end end