Sha256: 776eba358743300b4d4479b4893527ef2893e5719f278e49c94a32b6c8bb2c55
Contents?: true
Size: 1.33 KB
Versions: 3
Compression:
Stored size: 1.33 KB
Contents
# frozen_string_literal: true module Tobox module Plugins module EventGrouping def self.configure(conf) conf.config[:group_column] = :group_id end module FetcherMethods def initialize(_, configuration) super @group_column = configuration[:group_column] end private def fetch_event_ids group = @pick_next_sql.for_update .skip_locked .limit(1) .select(@group_column) # get total from a group, to compare to the number of future locked rows. total_from_group = @ds.where(@group_column => group).count event_ids = @ds.where(@group_column => group) .order(Sequel.desc(:run_at, nulls: :first), :id) .for_update.skip_locked.select_map(:id) if event_ids.size != total_from_group # this happens if concurrent workers locked different rows from the same group, # or when new rows from a given group have been inserted after the lock has been # acquired event_ids = [] end # lock all event_ids.first(@batch_size) end end end register_plugin :event_grouping, EventGrouping end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.6.1 | lib/tobox/plugins/event_grouping.rb |
tobox-0.6.0 | lib/tobox/plugins/event_grouping.rb |
tobox-0.5.2 | lib/tobox/plugins/event_grouping.rb |