lib/dirty_pipeline/pg/storage.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/storage.rb in dirty_pipeline-0.8.2

- old
+ new

@@ -37,10 +37,14 @@ reset if @store.empty? @subject_key = "#{subject.class}:#{subject.id}" raise InvalidPipelineStorage, store unless valid_store? end + def with_postgres(&block) + DirtyPipeline.with_postgres(&block) + end + def reset! reset save! end @@ -55,29 +59,27 @@ DO UPDATE SET data = EXCLUDED.data, error = EXCLUDED.error; SQL def commit!(event) store["status"] = event.destination if event.destination store["state"].merge!(event.changes) unless event.changes.to_h.empty? - DirtyPipeline.with_postgres do |c| - data, error = {}, {} - data = event.data.to_h if event.data.respond_to?(:to_h) - error = event.error.to_h if event.error.respond_to?(:to_h) - c.transaction do - c.exec( - SAVE_EVENT, - [event.id, subject_key, JSON.dump(data), JSON.dump(error)] - ) - end + data, error = {}, {} + data = event.data.to_h if event.data.respond_to?(:to_h) + error = event.error.to_h if event.error.respond_to?(:to_h) + with_postgres do |c| + c.exec( + SAVE_EVENT, + [event.id, subject_key, JSON.dump(data), JSON.dump(error)] + ) end save! end FIND_EVENT = <<-SQL SELECT data, error FROM dp_events_store WHERE uuid = $1 AND context = $2; SQL def find_event(event_id) - DirtyPipeline.with_postgres do |c| + with_postgres do |c| found_event, found_error = PG.multi(c.exec(FIND_EVENT, [event_id, subject_key])) return unless found_event Event.new( data: JSON.parse(found_event), error: JSON.parse(found_error)