lib/dirty_pipeline/storage.rb in dirty_pipeline-0.3.0 vs lib/dirty_pipeline/storage.rb in dirty_pipeline-0.4.0

- old
+ new

@@ -15,23 +15,30 @@ end def init_store(store_field) self.store = subject.send(store_field).to_h clear! if store.empty? - return if (store.keys & %w(cache status events errors state)).size == 5 + return if valid_store? raise InvalidPipelineStorage, store end + def valid_store? + ( + store.keys & + %w(status pipeline_status events errors state transaction_depth) + ).size == 6 + end + def clear self.store = subject.send( "#{field}=", "status" => nil, "pipeline_status" => nil, "state" => {}, - "cache" => {}, "events" => [], "errors" => [], + "transaction_depth" => 1 ) DirtyPipeline.with_redis { |r| r.del(pipeline_status_key) } end def clear! @@ -41,11 +48,12 @@ def start!(transition, args) events << { "transition" => transition, "args" => args, - "created_at" => Time.now + "created_at" => Time.now, + "cache" => {}, } increment_attempts_count self.pipeline_status = PROCESSING_STATUS # self.status = "processing", should be set by Locker commit! @@ -98,11 +106,11 @@ store["pipeline_status"] = value end def commit_pipeline_status!(value = nil) self.pipeline_status = value - store["cache"].clear + last_event["cache"].clear commit! end alias :reset_pipeline_status! :commit_pipeline_status! def pipeline_status @@ -134,9 +142,31 @@ store.fetch("errors") end def last_error errors.last.to_h + end + + def reset_transaction_depth + store["transaction_depth"] = 1 + end + + def reset_transaction_depth! + reset_transaction_depth + commit! + end + + def transaction_depth + store["transaction_depth"] + end + + def increment_transaction_depth + store["transaction_depth"] = store["transaction_depth"].to_i + 1 + end + + def increment_transaction_depth! + increment_transaction_depth + commit! end def increment_attempts_count last_event.merge!( "attempts_count" => last_event["attempts_count"].to_i + 1