Sha256: 776eba358743300b4d4479b4893527ef2893e5719f278e49c94a32b6c8bb2c55

Contents?: true

Size: 1.33 KB

Versions: 3

Compression:

Stored size: 1.33 KB

Contents

# frozen_string_literal: true

module Tobox
  module Plugins
    module EventGrouping
      def self.configure(conf)
        conf.config[:group_column] = :group_id
      end

      module FetcherMethods
        def initialize(_, configuration)
          super

          @group_column = configuration[:group_column]
        end

        private

        def fetch_event_ids
          group = @pick_next_sql.for_update
                                .skip_locked
                                .limit(1)
                                .select(@group_column)

          # get total from a group, to compare to the number of future locked rows.
          total_from_group = @ds.where(@group_column => group).count

          event_ids = @ds.where(@group_column => group)
                         .order(Sequel.desc(:run_at, nulls: :first), :id)
                         .for_update.skip_locked.select_map(:id)

          if event_ids.size != total_from_group
            # this happens if concurrent workers locked different rows from the same group,
            # or when new rows from a given group have been inserted after the lock has been
            # acquired
            event_ids = []
          end

          # lock all
          event_ids.first(@batch_size)
        end
      end
    end

    register_plugin :event_grouping, EventGrouping
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
tobox-0.6.1 lib/tobox/plugins/event_grouping.rb
tobox-0.6.0 lib/tobox/plugins/event_grouping.rb
tobox-0.5.2 lib/tobox/plugins/event_grouping.rb