Sha256: 100cb67a4fd96a19235b77bb5717f723933048ab2071a5b8b5636d255a6c6dcf

Contents?: true

Size: 1.33 KB

Versions: 1

Compression:

Stored size: 1.33 KB

Contents

# frozen_string_literal: true

module Tobox
  module Plugins
    module EventGrouping
      def self.configure(conf)
        conf.config[:group_column] = :group_id
      end

      module FetcherMethods
        def initialize(_, configuration)
          super

          @group_column = configuration[:group_column]
        end

        private

        def fetch_event_ids
          group = @pick_next_sql.for_update
                                .skip_locked
                                .limit(1)
                                .select(@group_column)

          # get total from a group, to compare to the number of future locked rows.
          total_from_group = @ds.where(@group_column => group).count

          event_ids = @ds.where(@group_column => group)
                         .order(Sequel.desc(:run_at, nulls: :first), :id)
                         .for_update.skip_locked.select_map(:id)

          if event_ids.size != total_from_group
            # this happens if concurrent workers locked different rows from the same group,
            # or when new rows from a given group have been inserted after the lock has been
            # acquired
            event_ids = []
          end

          # lock all, process 1
          event_ids[0, 1]
        end
      end
    end

    register_plugin :event_grouping, EventGrouping
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
tobox-0.5.0 lib/tobox/plugins/event_grouping.rb