Sha256: be8b69c6b438edfb5390d941646d5c2f25e108f6520c5f59d62d7462d1fa8bdf
Contents?: true
Size: 1.33 KB
Versions: 1
Compression:
Stored size: 1.33 KB
Contents
# frozen_string_literal: true module Tobox module Plugins module EventGrouping def self.configure(conf) conf.config[:group_column] = :group_id end module FetcherMethods def initialize(_, configuration) super @group_column = configuration[:group_column] end private def fetch_event_id group = @pick_next_sql.for_update .skip_locked .limit(1) .select(@group_column) # get total from a group, to compare to the number of future locked rows. total_from_group = @ds.where(@group_column => group).count event_ids = @ds.where(@group_column => group) .order(Sequel.desc(:run_at, nulls: :first), :id) .for_update.skip_locked.select_map(:id) if event_ids.size != total_from_group # this happens if concurrent workers locked different rows from the same group, # or when new rows from a given group have been inserted after the lock has been # acquired event_ids = [] end # lock all, process 1 event_ids.first end end end register_plugin :event_grouping, EventGrouping end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.5.1 | lib/tobox/plugins/event_grouping.rb |