lib/dirty_pipeline/pg/queue.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/queue.rb in dirty_pipeline-0.8.2

- old
+ new

@@ -45,13 +45,13 @@ DELETE FROM dp_event_queues WHERE key = $1; SQL def clear! with_postgres do |c| - c.transaction do - c.exec(DELETE_ACTIVE, [active_event_key]) - c.exec(DELETE_EVENTS, [events_queue_key]) + c.transaction do |tc| + tc.exec(DELETE_ACTIVE, [active_event_key]) + tc.exec(DELETE_EVENTS, [events_queue_key]) end end end SELECT_ALL_EVENTS = <<~SQL @@ -69,13 +69,11 @@ INSERT INTO dp_event_queues (id, key, payload) VALUES (-nextval('dp_event_queues_id_seq'), $1, $2); SQL def push(event) with_postgres do |c| - c.transaction do - c.exec(PUSH_EVENT, [events_queue_key, pack(event)]) - end + c.exec(PUSH_EVENT, [events_queue_key, pack(event)]) end self end alias :<< :push @@ -83,13 +81,11 @@ UNSHIFT_EVENT = <<~SQL INSERT INTO dp_event_queues (key, payload) VALUES ($1, $2); SQL def unshift(event) with_postgres do |c| - c.transaction do - c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)]) - end + c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)]) end self end SELECT_LAST_EVENT = <<~SQL @@ -107,17 +103,17 @@ INSERT INTO dp_active_events (key, payload) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET payload = EXCLUDED.payload; SQL def pop with_postgres do |c| - c.transaction do + c.transaction do |tc| event_id, raw_event = - PG.multi(c.exec(SELECT_LAST_EVENT, [events_queue_key])) + PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key])) if raw_event.nil? - c.exec(DELETE_ACTIVE_EVENT, [active_event_key]) + tc.exec(DELETE_ACTIVE_EVENT, [active_event_key]) else - c.exec(DELETE_EVENT, [events_queue_key, event_id]) - c.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event]) + tc.exec(DELETE_EVENT, [events_queue_key, event_id]) + tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event]) end unpack(raw_event) end end end