Sha256: 20176a74941e7d0d2df0e9bbab5820520251f49e48e3280a2bf46abcc8d418f8

Contents?: true

Size: 1.13 KB

Versions: 1

Compression:

Stored size: 1.13 KB

Contents

module Importo
  class ImportJob
    include Sidekiq::Job
    sidekiq_options retry: 5
    # queue_as :integration
    queue_as Importo.config.queue_name

    sidekiq_retries_exhausted do |msg, _e|
      attributes = msg["args"][0]
      index = msg["args"][1]
      import_id = msg["args"][2]

      execute_row(attributes, index, import_id, true, msg["bid"])
    end

    sidekiq_retry_in do |_count, exception, _jobhash|
      case exception
      when Importo::RetryError
        exception.delay
      end
    end

    def perform(attributes, index, import_id)
      self.class.execute_row(attributes, index, import_id, false, bid)
    end

    def self.execute_row(attributes, index, import_id, last_attempt, bid)
      attributes = JSON.load(attributes).deep_symbolize_keys if attributes.is_a?(String)

      import = Import.find(import_id)
      record = import.importer.process_data_row(attributes, index, last_attempt: last_attempt)

      batch = Importo::SidekiqBatchAdapter.find(bid)

      if !import.completed? && import.can_complete? && batch.finished?
        ImportJobCallback.new.on_complete(import_id: import_id)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
importo-3.0.15 app/jobs/importo/import_job.rb