lib/tobox/fetcher.rb in tobox-0.2.0 vs lib/tobox/fetcher.rb in tobox-0.3.0

- old
+ new

@@ -22,10 +22,13 @@ @group_column = configuration[:group_column] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] + @inbox_table = configuration[:inbox_table] + @inbox_column = configuration[:inbox_column] + @ds = @db[@table] run_at_conds = [ { Sequel[@table][:run_at] => nil }, (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP) @@ -78,18 +81,21 @@ events = @ds.where(id: event_ids).returning.delete if blk num_events = events.size - events.each do |ev| - ev[:metadata] = try_json_parse(ev[:metadata]) - handle_before_event(ev) - yield(to_message(ev)) + events.map! do |ev| + try_insert_inbox(ev) do + ev[:metadata] = try_json_parse(ev[:metadata]) + handle_before_event(ev) + yield(to_message(ev)) + ev + end rescue StandardError => e error = e raise Sequel::Rollback - end + end.compact! else events.map!(&method(:to_message)) end end end @@ -144,9 +150,19 @@ return unless data data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data + end + + def try_insert_inbox(event) + return yield unless @inbox_table && @inbox_column + + ret = @db[@inbox_table].insert_conflict.insert(@inbox_column => event[@inbox_column]) + + return unless ret + + yield end def handle_before_event(event) @logger.debug do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")