lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.5 vs lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.6

- old
+ new

@@ -21,56 +21,44 @@ config_param :username, :string config_param :password, :string config_param :version, :string, default: '43.0' config_param :login_endpoint, :string, default: 'login.salesforce.com' + attr_accessor :client + def configure(conf) super end def start super - login_info = login - client = Restforce.new login_info.merge(api_version: @version) + @client = generate_client - th_low = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z') - # query - if @topic == nil then - sleep(@polling_interval) - th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z') + if @topic == nil + start_at = now loop do - # create soql query string - where = "CreatedDate <= #{th_high} AND CreatedDate > #{th_low}" - soql = '' - if @query =~ /^(.+)\s(where|WHERE)\s(.+)$/ then - soql = "#{$1} WHERE #{where} AND #{$3}" - elsif @query =~ /^(.+)$/ then - soql = "#{$1} WHERE #{where}" - end + sleep(@polling_interval) + end_at = now + soql = build_query(start_at, end_at) begin log.info "query: #{soql}" - records = client.query(soql) + records = exec_query(soql) records.each do |record| router.emit(@tag, Fluent::Engine.now, record) end - sleep(@polling_interval) - th_low = th_high - th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z') + start_at = end_at rescue Restforce::UnauthorizedError => e log.error e # retry login - login_info = login - client = Restforce.new login_info.merge(api_version: @version) + @client = generate_client end end - # streaming api else EM.run do log.info "suscribe: #{@topic}" - # Subscribe to the PushTopic. - client.subscribe @topic do |message| + subscribe @topic do |message| router.emit(@tag, Fluent::Engine.now, message) end end end rescue SforceConnectionError => e @@ -83,11 +71,11 @@ private def login uri = URI(login_endpoint) - request = Net::HTTP::Post.new(uri.request_uri, initheader = {'Content-Type' =>'text/xml', 'SOAPAction' => "''"}) + request = Net::HTTP::Post.new(uri.request_uri, {'Content-Type' =>'text/xml', 'SOAPAction' => "''"}) request.body = <<BODY <?xml version="1.0" encoding="utf-8"?> <env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/"> @@ -110,19 +98,45 @@ doc = Nokogiri::XML(response.body) fault = doc.css('faultstring').inner_text raise SforceConnectionError, fault unless fault.empty? session_id = doc.css('sessionId').inner_text - /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css('serverUrl').inner_text) - instance_url = $1 + m = /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css('serverUrl').inner_text) + instance_url = m[1] log.info "login is successful. instance_url = '#{instance_url}'" {oauth_token: session_id, instance_url: instance_url} end end def login_endpoint "https://#{@login_endpoint}/services/Soap/u/#{@version}" + end + + def build_query(start_at, end_at) + where = "CreatedDate <= #{end_at} AND CreatedDate > #{start_at}" + if m = /^(.+)\s(where|WHERE)\s(.+)$/.match(@query) + return "#{m[1]} WHERE #{where} AND #{m[3]}" + end + + "#{@query} WHERE #{where}" + end + + def generate_client + login_info = login + Restforce.new login_info.merge(api_version: @version) + end + + def exec_query(soql) + @client.query(soql) + end + + def subscribe(name, &block) + @client.subscribe(name, &block) + end + + def now + DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z') end end end end