lib/tobox/fetcher.rb in tobox-0.2.0 vs lib/tobox/fetcher.rb in tobox-0.3.0
- old
+ new
@@ -22,10 +22,13 @@
@group_column = configuration[:group_column]
@exponential_retry_factor = configuration[:exponential_retry_factor]
max_attempts = configuration[:max_attempts]
+ @inbox_table = configuration[:inbox_table]
+ @inbox_column = configuration[:inbox_column]
+
@ds = @db[@table]
run_at_conds = [
{ Sequel[@table][:run_at] => nil },
(Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP)
@@ -78,18 +81,21 @@
events = @ds.where(id: event_ids).returning.delete
if blk
num_events = events.size
- events.each do |ev|
- ev[:metadata] = try_json_parse(ev[:metadata])
- handle_before_event(ev)
- yield(to_message(ev))
+ events.map! do |ev|
+ try_insert_inbox(ev) do
+ ev[:metadata] = try_json_parse(ev[:metadata])
+ handle_before_event(ev)
+ yield(to_message(ev))
+ ev
+ end
rescue StandardError => e
error = e
raise Sequel::Rollback
- end
+ end.compact!
else
events.map!(&method(:to_message))
end
end
end
@@ -144,9 +150,19 @@
return unless data
data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash)
data
+ end
+
+ def try_insert_inbox(event)
+ return yield unless @inbox_table && @inbox_column
+
+ ret = @db[@inbox_table].insert_conflict.insert(@inbox_column => event[@inbox_column])
+
+ return unless ret
+
+ yield
end
def handle_before_event(event)
@logger.debug do
log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")