lib/dirty_pipeline/storage.rb in dirty_pipeline-0.4.0 vs lib/dirty_pipeline/storage.rb in dirty_pipeline-0.5.0
- old
+ new
@@ -1,225 +1,91 @@
module DirtyPipeline
class Storage
- FAILED_STATUS = "failed".freeze
+ SUCCESS_STATUS = "success".freeze
+ FAILURE_STATUS = "failure".freeze
RETRY_STATUS = "retry".freeze
PROCESSING_STATUS = "processing".freeze
class InvalidPipelineStorage < StandardError; end
- attr_reader :subject, :field
+ attr_reader :subject, :field, :transactions_queue
attr_accessor :store
alias :to_h :store
def initialize(subject, field)
@subject = subject
@field = field
init_store(field)
end
def init_store(store_field)
self.store = subject.send(store_field).to_h
- clear! if store.empty?
+ clear if store.empty?
return if valid_store?
raise InvalidPipelineStorage, store
end
def valid_store?
- (
- store.keys &
- %w(status pipeline_status events errors state transaction_depth)
- ).size == 6
+ (store.keys & %w(status events errors state)).size.eql?(4)
end
+ # PG JSONB column
+ # {
+ # status: :errored,
+ # state: {
+ # field: "value",
+ # },
+ # errors: {
+ # "<event_id>": {
+ # error: "RuPost::API::Error",
+ # error_message: "Timeout error",
+ # created_at: 2018-01-01T13:22Z
+ # },
+ # },
+ # events: {
+ # <event_id>: {
+ # action: Init,
+ # input: ...,
+ # created_at: ...,
+ # updated_at: ...,
+ # attempts_count: 2,
+ # },
+ # <event_id>: {...},
+ # }
+ # }
def clear
self.store = subject.send(
"#{field}=",
"status" => nil,
- "pipeline_status" => nil,
"state" => {},
- "events" => [],
- "errors" => [],
- "transaction_depth" => 1
+ "events" => {},
+ "errors" => {}
)
- DirtyPipeline.with_redis { |r| r.del(pipeline_status_key) }
end
def clear!
clear
- commit!
+ subject.update_attributes!(field => store)
end
- def start!(transition, args)
- events << {
- "transition" => transition,
- "args" => args,
- "created_at" => Time.now,
- "cache" => {},
- }
- increment_attempts_count
- self.pipeline_status = PROCESSING_STATUS
- # self.status = "processing", should be set by Locker
- commit!
- end
-
- def start_retry!
- last_event.merge!(updated_at: Time.now)
- increment_attempts_count
- self.pipeline_status = PROCESSING_STATUS
- # self.status = "processing", should be set by Locker
- commit!
- end
-
- def complete!(output, destination)
- store["status"] = destination
- state.merge!(output)
- last_event.merge!(
- "output" => output,
- "updated_at" => Time.now,
- "success" => true,
- )
- commit!
- end
-
- def fail_event!
- fail_event
- commit!
- end
-
- def fail_event
- last_event["failed"] = true
- end
-
def status
store["status"]
end
- def pipeline_status_key
- "pipeline-status:#{subject.class}:#{subject.id}:#{field}"
- end
-
- def pipeline_status=(value)
- DirtyPipeline.with_redis do |r|
- if value
- r.set(pipeline_status_key, value)
- else
- r.del(pipeline_status_key)
- end
- end
- store["pipeline_status"] = value
- end
-
- def commit_pipeline_status!(value = nil)
- self.pipeline_status = value
- last_event["cache"].clear
- commit!
- end
- alias :reset_pipeline_status! :commit_pipeline_status!
-
- def pipeline_status
- DirtyPipeline.with_redis do |r|
- store["pipeline_status"] = r.get(pipeline_status_key)
- end
- store.fetch("pipeline_status")
- end
-
- def state
- store.fetch("state")
- end
-
- def events
- store.fetch("events")
- end
-
- def last_event
- events.last.to_h
- end
-
- def last_event_error(event_idx = nil)
- event = events[event_idx] if event_idx
- event ||= last_event
- errors[event["error_idx"]].to_h
- end
-
- def errors
- store.fetch("errors")
- end
-
- def last_error
- errors.last.to_h
- end
-
- def reset_transaction_depth
- store["transaction_depth"] = 1
- end
-
- def reset_transaction_depth!
- reset_transaction_depth
- commit!
- end
-
- def transaction_depth
- store["transaction_depth"]
- end
-
- def increment_transaction_depth
- store["transaction_depth"] = store["transaction_depth"].to_i + 1
- end
-
- def increment_transaction_depth!
- increment_transaction_depth
- commit!
- end
-
- def increment_attempts_count
- last_event.merge!(
- "attempts_count" => last_event["attempts_count"].to_i + 1
- )
- end
-
- def increment_attempts_count!
- increment_attempts_count
- commit!
- end
-
- def save_retry(error)
- save_error(error)
- self.pipeline_status = RETRY_STATUS
- end
-
- def save_retry!(error)
- save_retry(error)
- commit!
- end
-
- def save_exception(exception)
- errors << {
- "error" => exception.class.to_s,
- "error_message" => exception.message,
- "created_at" => Time.current,
- }
- last_event["error_idx"] = errors.size - 1
- fail_event
- self.pipeline_status = FAILED_STATUS
- end
-
- def save_exception!(error)
- save_exception(error)
- commit!
- end
-
- def commit!
+ def commit!(event)
+ store["status"] = event.destination if event.destination
+ require'pry';binding.pry unless event.changes.respond_to?(:to_h)
+ store["state"].merge!(event.changes) unless event.changes.to_h.empty?
+ store["errors"][event.id] = event.error unless event.error.to_h.empty?
+ store["events"][event.id] = event.data unless event.data.to_h.empty?
subject.assign_attributes(field => store)
subject.save!
end
- def ready?
- storage.pipeline_status.nil?
+ def processing_event
+ find_event(transactions_queue.processing_event.id)
end
- def failed?
- pipeline_status == FAILED_STATUS
- end
-
- def processing?
- [PROCESSING_STATUS, RETRY_STATUS].include?(pipeline_status)
+ def find_event(event_id)
+ return unless (found_event = store["events"][event_id])
+ Event.new(data: found_event, error: store["errors"][event_id])
end
end
end