Sha256: 24e76d8b461551416019adf6d3d822166c598aa79a31a2b9e1da9572589e15d2

Contents?: true

Size: 1.38 KB

Versions: 1

Compression:

Stored size: 1.38 KB

Contents

# frozen_string_literal: true

module Tobox
  module Plugins
    module Progress
      def self.configure(conf)
        conf.config[:visibility_timeout] = 30
      end

      module FetcherMethods
        private

        def do_fetch_events
          # mark events as invisible by using run_at as a visibility timeout
          mark_as_fetched_params = {
            run_at: Sequel.date_add(
              Sequel::CURRENT_TIMESTAMP,
              seconds: @configuration[:visibility_timeout]
            ),
            attempts: Sequel[@table][:attempts] + 1,
            last_error: nil
          }

          if @ds.supports_returning?(:update)
            @ds.where(id: fetch_event_ids).returning.update(mark_as_fetched_params)
          else
            event_ids = fetch_event_ids.select_map(:id)
            events_ds = @ds.where(id: event_ids)
            events_ds.update(mark_as_fetched_params)
            events_ds.first(@batch_size)
          end
        end

        def calculate_event_retry_interval(attempts)
          super(attempts - 1)
        end

        def set_event_retry_attempts(event, update_params)
          update_params.delete(:attempts)
          super
        end

        def events_tr
          yield
        end

        def event_id_tr(&block)
          @db.transaction(savepoint: false, &block)
        end
      end
    end
    register_plugin :progress, Progress
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
tobox-0.6.1 lib/tobox/plugins/progress.rb