lib/dirty_pipeline/storage.rb in dirty_pipeline-0.3.0 vs lib/dirty_pipeline/storage.rb in dirty_pipeline-0.4.0
- old
+ new
@@ -15,23 +15,30 @@
end
def init_store(store_field)
self.store = subject.send(store_field).to_h
clear! if store.empty?
- return if (store.keys & %w(cache status events errors state)).size == 5
+ return if valid_store?
raise InvalidPipelineStorage, store
end
+ def valid_store?
+ (
+ store.keys &
+ %w(status pipeline_status events errors state transaction_depth)
+ ).size == 6
+ end
+
def clear
self.store = subject.send(
"#{field}=",
"status" => nil,
"pipeline_status" => nil,
"state" => {},
- "cache" => {},
"events" => [],
"errors" => [],
+ "transaction_depth" => 1
)
DirtyPipeline.with_redis { |r| r.del(pipeline_status_key) }
end
def clear!
@@ -41,11 +48,12 @@
def start!(transition, args)
events << {
"transition" => transition,
"args" => args,
- "created_at" => Time.now
+ "created_at" => Time.now,
+ "cache" => {},
}
increment_attempts_count
self.pipeline_status = PROCESSING_STATUS
# self.status = "processing", should be set by Locker
commit!
@@ -98,11 +106,11 @@
store["pipeline_status"] = value
end
def commit_pipeline_status!(value = nil)
self.pipeline_status = value
- store["cache"].clear
+ last_event["cache"].clear
commit!
end
alias :reset_pipeline_status! :commit_pipeline_status!
def pipeline_status
@@ -134,9 +142,31 @@
store.fetch("errors")
end
def last_error
errors.last.to_h
+ end
+
+ def reset_transaction_depth
+ store["transaction_depth"] = 1
+ end
+
+ def reset_transaction_depth!
+ reset_transaction_depth
+ commit!
+ end
+
+ def transaction_depth
+ store["transaction_depth"]
+ end
+
+ def increment_transaction_depth
+ store["transaction_depth"] = store["transaction_depth"].to_i + 1
+ end
+
+ def increment_transaction_depth!
+ increment_transaction_depth
+ commit!
end
def increment_attempts_count
last_event.merge!(
"attempts_count" => last_event["attempts_count"].to_i + 1