lib/dirty_pipeline/pg/queue.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/queue.rb in dirty_pipeline-0.8.2
- old
+ new
@@ -45,13 +45,13 @@
DELETE FROM dp_event_queues WHERE key = $1;
SQL
def clear!
with_postgres do |c|
- c.transaction do
- c.exec(DELETE_ACTIVE, [active_event_key])
- c.exec(DELETE_EVENTS, [events_queue_key])
+ c.transaction do |tc|
+ tc.exec(DELETE_ACTIVE, [active_event_key])
+ tc.exec(DELETE_EVENTS, [events_queue_key])
end
end
end
SELECT_ALL_EVENTS = <<~SQL
@@ -69,13 +69,11 @@
INSERT INTO dp_event_queues (id, key, payload)
VALUES (-nextval('dp_event_queues_id_seq'), $1, $2);
SQL
def push(event)
with_postgres do |c|
- c.transaction do
- c.exec(PUSH_EVENT, [events_queue_key, pack(event)])
- end
+ c.exec(PUSH_EVENT, [events_queue_key, pack(event)])
end
self
end
alias :<< :push
@@ -83,13 +81,11 @@
UNSHIFT_EVENT = <<~SQL
INSERT INTO dp_event_queues (key, payload) VALUES ($1, $2);
SQL
def unshift(event)
with_postgres do |c|
- c.transaction do
- c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)])
- end
+ c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)])
end
self
end
SELECT_LAST_EVENT = <<~SQL
@@ -107,17 +103,17 @@
INSERT INTO dp_active_events (key, payload) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET payload = EXCLUDED.payload;
SQL
def pop
with_postgres do |c|
- c.transaction do
+ c.transaction do |tc|
event_id, raw_event =
- PG.multi(c.exec(SELECT_LAST_EVENT, [events_queue_key]))
+ PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key]))
if raw_event.nil?
- c.exec(DELETE_ACTIVE_EVENT, [active_event_key])
+ tc.exec(DELETE_ACTIVE_EVENT, [active_event_key])
else
- c.exec(DELETE_EVENT, [events_queue_key, event_id])
- c.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event])
+ tc.exec(DELETE_EVENT, [events_queue_key, event_id])
+ tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event])
end
unpack(raw_event)
end
end
end