Sha256: 6b55c793e11eb211decd1a49a1353c66f896a96fd959edbb22f91dfd10c6cd83
Contents?: true
Size: 1.71 KB
Versions: 34
Compression:
Stored size: 1.71 KB
Contents
require 'flydata-core/logger' module Flydata module QueryBasedSync class ResourceRequester include FlydataCore::Logger def initialize(context) @context = context @resource_client = create_resource_client(context) end attr_reader :context # Start fetching resources by keeping a same connection # Caller needs to call methods using the connection with block # # ex: # requester.start do |req| # req.fetch_resources(table_name) do |response| # handle(response) # end # end def start context.table_meta.reload(@resource_client) yield self ensure @resource_client.close end # Fetch resources for a table # It may call the callback multiple times in the following cases # - The resource size is bigger than the max size per request # - Resume to fetch resources if resume info exists in per-table position file def each_response(table_name, interval = 1) latest_src_pos = context.source_pos_class.new(context.table_meta.current_snapshot) loop do responses = fetch_responses_once(table_name, latest_src_pos) break if responses.nil? || responses.empty? responses.each do |response| yield response end break if responses.last.new_source_pos >= latest_src_pos sleep interval # to avoid rush end nil end # Override # Return a resource client object # ex: pg_client, mysql_connection def create_resource_client(context) raise "Not implemented" end # Override # Fetch the resource and return a response object # Returned object must be kind of Flydata::QueryBasedSync::Response def fetch_responses_once(table_name, latest_src_pos) raise "Not implemented" end end end end
Version data entries
34 entries across 34 versions & 1 rubygems