lib/wcc/contentful/sync_engine.rb in wcc-contentful-1.2.0 vs lib/wcc/contentful/sync_engine.rb in wcc-contentful-1.2.1

- old
+ new

@@ -29,19 +29,22 @@ def state (@state&.dup || token_wrapper_factory(nil)).freeze end - attr_reader :store - attr_reader :client + attr_reader :store, :client, :options def should_sync? store&.index? end - def initialize(state: nil, store: nil, client: nil, key: nil) - @state_key = key || "sync:#{object_id}" + def initialize(client: nil, store: nil, state: nil, **options) + @options = { + key: "sync:#{object_id}" + }.merge!(options).freeze + + @state_key = @options[:key] || "sync:#{object_id}" @client = client || WCC::Contentful::Services.instance.client @mutex = Mutex.new if store unless %i[index index? find].all? { |m| store.respond_to?(m) } @@ -51,13 +54,11 @@ @store = store end if state @state = token_wrapper_factory(state) raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash - unless @state.dig('sys', 'type') == 'token' - raise ArgumentError, ':state param must be of sys.type = "token"' - end + raise ArgumentError, ':state param must be of sys.type = "token"' unless @state.dig('sys', 'type') == 'token' end raise ArgumentError, 'either :state or :store must be provided' unless @state || @store end # Gets the next increment of data from the Sync API. @@ -142,36 +143,51 @@ def perform(event = nil) return unless sync_engine&.should_sync? up_to_id = nil - up_to_id = event[:up_to_id] || event.dig('sys', 'id') if event - sync!(up_to_id: up_to_id) + retry_count = 0 + if event + up_to_id = event[:up_to_id] || event.dig('sys', 'id') + retry_count = event[:retry_count] if event[:retry_count] + end + sync!(up_to_id: up_to_id, retry_count: retry_count) end # Calls the Contentful Sync API and updates the configured store with the returned # data. # # @param [String] up_to_id # An ID that we know has changed and should come back from the sync. # If we don't find this ID in the sync data, then drop a job to try # the sync again after a few minutes. # - def sync!(up_to_id: nil) + def sync!(up_to_id: nil, retry_count: 0) id_found, count = sync_engine.next(up_to_id: up_to_id) next_sync_token = sync_engine.state['token'] logger.info "Synced #{count} entries. Next sync token:\n #{next_sync_token}" - logger.info "Should enqueue again? [#{!id_found}]" - # Passing nil to only enqueue the job 1 more time - sync_later!(up_to_id: nil) unless id_found + unless id_found + if retry_count >= configuration.sync_retry_limit + logger.error "Unable to find item with id '#{up_to_id}' on the Sync API. " \ + "Abandoning after #{retry_count} retries." + else + wait = (2**retry_count) * configuration.sync_retry_wait.seconds + logger.info "Unable to find item with id '#{up_to_id}' on the Sync API. " \ + "Retrying after #{wait.inspect} " \ + "(#{configuration.sync_retry_limit - retry_count} retries remaining)" + + self.class.set(wait: wait) + .perform_later(up_to_id: up_to_id, retry_count: retry_count + 1) + end + end next_sync_token end - # Drops an ActiveJob job to invoke WCC::Contentful.sync! after a given amount + # Enqueues an ActiveJob job to invoke WCC::Contentful.sync! after a given amount # of time. - def sync_later!(up_to_id: nil, wait: 10.minutes) + def sync_later!(up_to_id: nil, wait: 10.seconds) self.class.set(wait: wait) .perform_later(up_to_id: up_to_id) end end end