lib/dirty_pipeline/pg/railway.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/railway.rb in dirty_pipeline-0.8.2
- old
+ new
@@ -35,22 +35,26 @@
[operation, create_queue(operation)]
end
]
end
+ def with_postgres(&block)
+ DirtyPipeline.with_postgres(&block)
+ end
+
DELETE_OPERATION = <<~SQL
DELETE FROM dp_active_operations WHERE key = $1;
SQL
DELETE_TRANSACTION = <<~SQL
DELETE FROM dp_active_transactions WHERE key = $1;
SQL
def clear!
@queues.values.each(&:clear!)
- DirtyPipeline.with_postgres do |c|
- c.transaction do
- c.exec DELETE_OPERATION, [active_operation_key]
- c.exec DELETE_TRANSACTION, [active_transaction_key]
+ with_postgres do |c|
+ c.transaction do |tc|
+ tc.exec DELETE_OPERATION, [active_operation_key]
+ tc.exec DELETE_TRANSACTION, [active_transaction_key]
end
end
end
def next
@@ -74,32 +78,32 @@
SQL
def switch_to(name)
raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s)
return if name.to_s == active
- DirtyPipeline.with_postgres do |c|
- c.transaction do
- c.exec(SWITCH_OPERATION, [active_operation_key, name])
- end
+ with_postgres do |c|
+ c.exec('START TRANSACTION;')
+ c.exec(SWITCH_OPERATION, [active_operation_key, name])
+ c.exec('COMMIT;')
end
end
SELECT_OPERATION = <<~SQL
SELECT name FROM dp_active_operations WHERE key = $1;
SQL
def active
- DirtyPipeline.with_postgres do |c|
+ with_postgres do |c|
PG.single c.exec(SELECT_OPERATION, [active_operation_key])
end
end
alias :operation :active
SELECT_TRANSACTION = <<~SQL
SELECT name FROM dp_active_transactions WHERE key = $1;
SQL
def running_transaction
- DirtyPipeline.with_postgres do |c|
+ with_postgres do |c|
PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key])
end
end
def other_transaction_in_progress?
@@ -126,13 +130,13 @@
ON CONFLICT (key)
DO UPDATE SET name = EXCLUDED.name;
SQL
def start_transaction!
switch_to(DEFAULT_OPERATIONS.first) unless active
- DirtyPipeline.with_postgres do |c|
- c.transaction do
- c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id])
- end
+ with_postgres do |c|
+ c.exec('START TRANSACTION;')
+ c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id])
+ c.exec('COMMIT;')
end
end
def finish_transaction!
clear! if running_transaction == @tx_id