lib/dirty_pipeline/pg/storage.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/storage.rb in dirty_pipeline-0.8.2
- old
+ new
@@ -37,10 +37,14 @@
reset if @store.empty?
@subject_key = "#{subject.class}:#{subject.id}"
raise InvalidPipelineStorage, store unless valid_store?
end
+ def with_postgres(&block)
+ DirtyPipeline.with_postgres(&block)
+ end
+
def reset!
reset
save!
end
@@ -55,29 +59,27 @@
DO UPDATE SET data = EXCLUDED.data, error = EXCLUDED.error;
SQL
def commit!(event)
store["status"] = event.destination if event.destination
store["state"].merge!(event.changes) unless event.changes.to_h.empty?
- DirtyPipeline.with_postgres do |c|
- data, error = {}, {}
- data = event.data.to_h if event.data.respond_to?(:to_h)
- error = event.error.to_h if event.error.respond_to?(:to_h)
- c.transaction do
- c.exec(
- SAVE_EVENT,
- [event.id, subject_key, JSON.dump(data), JSON.dump(error)]
- )
- end
+ data, error = {}, {}
+ data = event.data.to_h if event.data.respond_to?(:to_h)
+ error = event.error.to_h if event.error.respond_to?(:to_h)
+ with_postgres do |c|
+ c.exec(
+ SAVE_EVENT,
+ [event.id, subject_key, JSON.dump(data), JSON.dump(error)]
+ )
end
save!
end
FIND_EVENT = <<-SQL
SELECT data, error FROM dp_events_store
WHERE uuid = $1 AND context = $2;
SQL
def find_event(event_id)
- DirtyPipeline.with_postgres do |c|
+ with_postgres do |c|
found_event, found_error =
PG.multi(c.exec(FIND_EVENT, [event_id, subject_key]))
return unless found_event
Event.new(
data: JSON.parse(found_event), error: JSON.parse(found_error)