lib/dirty_pipeline/pg/railway.rb in dirty_pipeline-0.8.1 vs lib/dirty_pipeline/pg/railway.rb in dirty_pipeline-0.8.2

- old
+ new

@@ -35,22 +35,26 @@ [operation, create_queue(operation)] end ] end + def with_postgres(&block) + DirtyPipeline.with_postgres(&block) + end + DELETE_OPERATION = <<~SQL DELETE FROM dp_active_operations WHERE key = $1; SQL DELETE_TRANSACTION = <<~SQL DELETE FROM dp_active_transactions WHERE key = $1; SQL def clear! @queues.values.each(&:clear!) - DirtyPipeline.with_postgres do |c| - c.transaction do - c.exec DELETE_OPERATION, [active_operation_key] - c.exec DELETE_TRANSACTION, [active_transaction_key] + with_postgres do |c| + c.transaction do |tc| + tc.exec DELETE_OPERATION, [active_operation_key] + tc.exec DELETE_TRANSACTION, [active_transaction_key] end end end def next @@ -74,32 +78,32 @@ SQL def switch_to(name) raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s) return if name.to_s == active - DirtyPipeline.with_postgres do |c| - c.transaction do - c.exec(SWITCH_OPERATION, [active_operation_key, name]) - end + with_postgres do |c| + c.exec('START TRANSACTION;') + c.exec(SWITCH_OPERATION, [active_operation_key, name]) + c.exec('COMMIT;') end end SELECT_OPERATION = <<~SQL SELECT name FROM dp_active_operations WHERE key = $1; SQL def active - DirtyPipeline.with_postgres do |c| + with_postgres do |c| PG.single c.exec(SELECT_OPERATION, [active_operation_key]) end end alias :operation :active SELECT_TRANSACTION = <<~SQL SELECT name FROM dp_active_transactions WHERE key = $1; SQL def running_transaction - DirtyPipeline.with_postgres do |c| + with_postgres do |c| PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key]) end end def other_transaction_in_progress? @@ -126,13 +130,13 @@ ON CONFLICT (key) DO UPDATE SET name = EXCLUDED.name; SQL def start_transaction! switch_to(DEFAULT_OPERATIONS.first) unless active - DirtyPipeline.with_postgres do |c| - c.transaction do - c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id]) - end + with_postgres do |c| + c.exec('START TRANSACTION;') + c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id]) + c.exec('COMMIT;') end end def finish_transaction! clear! if running_transaction == @tx_id