lib/wcc/contentful/sync_engine.rb in wcc-contentful-1.2.0 vs lib/wcc/contentful/sync_engine.rb in wcc-contentful-1.2.1
- old
+ new
@@ -29,19 +29,22 @@
def state
(@state&.dup || token_wrapper_factory(nil)).freeze
end
- attr_reader :store
- attr_reader :client
+ attr_reader :store, :client, :options
def should_sync?
store&.index?
end
- def initialize(state: nil, store: nil, client: nil, key: nil)
- @state_key = key || "sync:#{object_id}"
+ def initialize(client: nil, store: nil, state: nil, **options)
+ @options = {
+ key: "sync:#{object_id}"
+ }.merge!(options).freeze
+
+ @state_key = @options[:key] || "sync:#{object_id}"
@client = client || WCC::Contentful::Services.instance.client
@mutex = Mutex.new
if store
unless %i[index index? find].all? { |m| store.respond_to?(m) }
@@ -51,13 +54,11 @@
@store = store
end
if state
@state = token_wrapper_factory(state)
raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash
- unless @state.dig('sys', 'type') == 'token'
- raise ArgumentError, ':state param must be of sys.type = "token"'
- end
+ raise ArgumentError, ':state param must be of sys.type = "token"' unless @state.dig('sys', 'type') == 'token'
end
raise ArgumentError, 'either :state or :store must be provided' unless @state || @store
end
# Gets the next increment of data from the Sync API.
@@ -142,36 +143,51 @@
def perform(event = nil)
return unless sync_engine&.should_sync?
up_to_id = nil
- up_to_id = event[:up_to_id] || event.dig('sys', 'id') if event
- sync!(up_to_id: up_to_id)
+ retry_count = 0
+ if event
+ up_to_id = event[:up_to_id] || event.dig('sys', 'id')
+ retry_count = event[:retry_count] if event[:retry_count]
+ end
+ sync!(up_to_id: up_to_id, retry_count: retry_count)
end
# Calls the Contentful Sync API and updates the configured store with the returned
# data.
#
# @param [String] up_to_id
# An ID that we know has changed and should come back from the sync.
# If we don't find this ID in the sync data, then drop a job to try
# the sync again after a few minutes.
#
- def sync!(up_to_id: nil)
+ def sync!(up_to_id: nil, retry_count: 0)
id_found, count = sync_engine.next(up_to_id: up_to_id)
next_sync_token = sync_engine.state['token']
logger.info "Synced #{count} entries. Next sync token:\n #{next_sync_token}"
- logger.info "Should enqueue again? [#{!id_found}]"
- # Passing nil to only enqueue the job 1 more time
- sync_later!(up_to_id: nil) unless id_found
+ unless id_found
+ if retry_count >= configuration.sync_retry_limit
+ logger.error "Unable to find item with id '#{up_to_id}' on the Sync API. " \
+ "Abandoning after #{retry_count} retries."
+ else
+ wait = (2**retry_count) * configuration.sync_retry_wait.seconds
+ logger.info "Unable to find item with id '#{up_to_id}' on the Sync API. " \
+ "Retrying after #{wait.inspect} " \
+ "(#{configuration.sync_retry_limit - retry_count} retries remaining)"
+
+ self.class.set(wait: wait)
+ .perform_later(up_to_id: up_to_id, retry_count: retry_count + 1)
+ end
+ end
next_sync_token
end
- # Drops an ActiveJob job to invoke WCC::Contentful.sync! after a given amount
+ # Enqueues an ActiveJob job to invoke WCC::Contentful.sync! after a given amount
# of time.
- def sync_later!(up_to_id: nil, wait: 10.minutes)
+ def sync_later!(up_to_id: nil, wait: 10.seconds)
self.class.set(wait: wait)
.perform_later(up_to_id: up_to_id)
end
end
end