lib/opentelemetry/instrumentation/que/patches/que_job.rb in opentelemetry-instrumentation-que-0.5.0 vs lib/opentelemetry/instrumentation/que/patches/que_job.rb in opentelemetry-instrumentation-que-0.5.1
- old
+ new
@@ -41,30 +41,42 @@
if otel_config[:propagation_style] != :none
tags ||= []
OpenTelemetry.propagation.inject(tags, setter: TagSetter)
end
- job_options = job_options.merge(tags: tags)
- job = super(*args, job_options: job_options, **arg_opts)
+ # In Que version 2.1.0 `bulk_enqueue` was introduced and in order
+ # for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue.
+ if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]
+ Thread.current[:que_jobs_to_bulk_insert][:job_options] = job_options.merge(tags: tags)
+ job = super(*args, **arg_opts)
+ job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last
+ else
+ job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
+ job_attrs = job.que_attrs
+ end
- span.name = "#{job.que_attrs[:job_class]} send"
- span.add_attributes(QueJob.job_attributes(job))
+ span.name = "#{job_attrs[:job_class]} send"
+ span.add_attributes(QueJob.job_attributes(job_attrs))
job
end
end
+
+ def gem_version
+ @gem_version ||= Gem.loaded_specs['que'].version
+ end
end
- def self.job_attributes(job)
+ def self.job_attributes(job_attrs)
attributes = {
'messaging.system' => 'que',
'messaging.destination_kind' => 'queue',
'messaging.operation' => 'send',
- 'messaging.destination' => job.que_attrs[:queue] || 'default',
- 'messaging.que.job_class' => job.que_attrs[:job_class],
- 'messaging.que.priority' => job.que_attrs[:priority] || 100
+ 'messaging.destination' => job_attrs[:queue] || 'default',
+ 'messaging.que.job_class' => job_attrs[:job_class],
+ 'messaging.que.priority' => job_attrs[:priority] || 100
}
- attributes['messaging.message_id'] = job.que_attrs[:id] if job.que_attrs[:id]
+ attributes['messaging.message_id'] = job_attrs[:id] if job_attrs[:id]
attributes
end
end
end
end