# frozen_string_literal: true module Tobox module Plugins module EventGrouping def self.configure(conf) conf.config[:group_column] = :group_id end module FetcherMethods def initialize(_, configuration) super @group_column = configuration[:group_column] end private def fetch_event_ids group = @pick_next_sql.for_update .skip_locked .limit(1) .select(@group_column) # get total from a group, to compare to the number of future locked rows. total_from_group = @ds.where(@group_column => group).count event_ids = @ds.where(@group_column => group) .order(Sequel.desc(:run_at, nulls: :first), :id) .for_update.skip_locked.select_map(:id) if event_ids.size != total_from_group # this happens if concurrent workers locked different rows from the same group, # or when new rows from a given group have been inserted after the lock has been # acquired event_ids = [] end # lock all event_ids.first(@batch_size) end end end register_plugin :event_grouping, EventGrouping end end