lib/dirty_pipeline/storage.rb in dirty_pipeline-0.5.0 vs lib/dirty_pipeline/storage.rb in dirty_pipeline-0.6.0
- old
+ new
@@ -1,91 +1,85 @@
module DirtyPipeline
+ # Storage structure
+ # {
+ # status: :errored,
+ # state: {
+ # field: "value",
+ # },
+ # errors: {
+ # "<event_id>": {
+ # error: "RuPost::API::Error",
+ # error_message: "Timeout error",
+ # created_at: 2018-01-01T13:22Z
+ # },
+ # },
+ # events: {
+ # <event_id>: {
+ # transition: "Create",
+ # args: ...,
+ # changes: ...,
+ # created_at: ...,
+ # updated_at: ...,
+ # attempts_count: 2,
+ # },
+ # <event_id>: {...},
+ # }
+ # }
class Storage
- SUCCESS_STATUS = "success".freeze
- FAILURE_STATUS = "failure".freeze
- RETRY_STATUS = "retry".freeze
- PROCESSING_STATUS = "processing".freeze
class InvalidPipelineStorage < StandardError; end
- attr_reader :subject, :field, :transactions_queue
- attr_accessor :store
+ attr_reader :subject, :field, :store
alias :to_h :store
def initialize(subject, field)
@subject = subject
@field = field
- init_store(field)
+ @store = subject.send(@field).to_h
+ reset if @store.empty?
+ raise InvalidPipelineStorage, store unless valid_store?
end
- def init_store(store_field)
- self.store = subject.send(store_field).to_h
- clear if store.empty?
- return if valid_store?
- raise InvalidPipelineStorage, store
+ def reset!
+ reset
+ save!
end
- def valid_store?
- (store.keys & %w(status events errors state)).size.eql?(4)
- end
-
- # PG JSONB column
- # {
- # status: :errored,
- # state: {
- # field: "value",
- # },
- # errors: {
- # "<event_id>": {
- # error: "RuPost::API::Error",
- # error_message: "Timeout error",
- # created_at: 2018-01-01T13:22Z
- # },
- # },
- # events: {
- # <event_id>: {
- # action: Init,
- # input: ...,
- # created_at: ...,
- # updated_at: ...,
- # attempts_count: 2,
- # },
- # <event_id>: {...},
- # }
- # }
- def clear
- self.store = subject.send(
- "#{field}=",
- "status" => nil,
- "state" => {},
- "events" => {},
- "errors" => {}
- )
- end
-
- def clear!
- clear
- subject.update_attributes!(field => store)
- end
-
def status
store["status"]
end
def commit!(event)
store["status"] = event.destination if event.destination
- require'pry';binding.pry unless event.changes.respond_to?(:to_h)
store["state"].merge!(event.changes) unless event.changes.to_h.empty?
store["errors"][event.id] = event.error unless event.error.to_h.empty?
store["events"][event.id] = event.data unless event.data.to_h.empty?
- subject.assign_attributes(field => store)
- subject.save!
+ save!
end
- def processing_event
- find_event(transactions_queue.processing_event.id)
+ def find_event(event_id)
+ return unless (found_event = store.dig("events", event_id))
+ Event.new(data: found_event, error: store.dig("errors", event_id))
end
- def find_event(event_id)
- return unless (found_event = store["events"][event_id])
- Event.new(data: found_event, error: store["errors"][event_id])
+ private
+
+ def valid_store?
+ (store.keys & %w(status events errors state)).size.eql?(4)
+ end
+
+ def save!
+ subject.send("#{field}=", store)
+ subject.save!
+ end
+
+ def reset
+ @store = subject.send(
+ "#{field}=",
+ {
+ "status" => nil,
+ "state" => {},
+ "events" => {},
+ "errors" => {}
+ }
+ )
end
end
end