lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.5 vs lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.6
- old
+ new
@@ -21,56 +21,44 @@
config_param :username, :string
config_param :password, :string
config_param :version, :string, default: '43.0'
config_param :login_endpoint, :string, default: 'login.salesforce.com'
+ attr_accessor :client
+
def configure(conf)
super
end
def start
super
- login_info = login
- client = Restforce.new login_info.merge(api_version: @version)
+ @client = generate_client
- th_low = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
- # query
- if @topic == nil then
- sleep(@polling_interval)
- th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
+ if @topic == nil
+ start_at = now
loop do
- # create soql query string
- where = "CreatedDate <= #{th_high} AND CreatedDate > #{th_low}"
- soql = ''
- if @query =~ /^(.+)\s(where|WHERE)\s(.+)$/ then
- soql = "#{$1} WHERE #{where} AND #{$3}"
- elsif @query =~ /^(.+)$/ then
- soql = "#{$1} WHERE #{where}"
- end
+ sleep(@polling_interval)
+ end_at = now
+ soql = build_query(start_at, end_at)
begin
log.info "query: #{soql}"
- records = client.query(soql)
+ records = exec_query(soql)
records.each do |record|
router.emit(@tag, Fluent::Engine.now, record)
end
- sleep(@polling_interval)
- th_low = th_high
- th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
+ start_at = end_at
rescue Restforce::UnauthorizedError => e
log.error e
# retry login
- login_info = login
- client = Restforce.new login_info.merge(api_version: @version)
+ @client = generate_client
end
end
- # streaming api
else
EM.run do
log.info "suscribe: #{@topic}"
- # Subscribe to the PushTopic.
- client.subscribe @topic do |message|
+ subscribe @topic do |message|
router.emit(@tag, Fluent::Engine.now, message)
end
end
end
rescue SforceConnectionError => e
@@ -83,11 +71,11 @@
private
def login
uri = URI(login_endpoint)
- request = Net::HTTP::Post.new(uri.request_uri, initheader = {'Content-Type' =>'text/xml', 'SOAPAction' => "''"})
+ request = Net::HTTP::Post.new(uri.request_uri, {'Content-Type' =>'text/xml', 'SOAPAction' => "''"})
request.body = <<BODY
<?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
@@ -110,19 +98,45 @@
doc = Nokogiri::XML(response.body)
fault = doc.css('faultstring').inner_text
raise SforceConnectionError, fault unless fault.empty?
session_id = doc.css('sessionId').inner_text
- /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css('serverUrl').inner_text)
- instance_url = $1
+ m = /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css('serverUrl').inner_text)
+ instance_url = m[1]
log.info "login is successful. instance_url = '#{instance_url}'"
{oauth_token: session_id, instance_url: instance_url}
end
end
def login_endpoint
"https://#{@login_endpoint}/services/Soap/u/#{@version}"
+ end
+
+ def build_query(start_at, end_at)
+ where = "CreatedDate <= #{end_at} AND CreatedDate > #{start_at}"
+ if m = /^(.+)\s(where|WHERE)\s(.+)$/.match(@query)
+ return "#{m[1]} WHERE #{where} AND #{m[3]}"
+ end
+
+ "#{@query} WHERE #{where}"
+ end
+
+ def generate_client
+ login_info = login
+ Restforce.new login_info.merge(api_version: @version)
+ end
+
+ def exec_query(soql)
+ @client.query(soql)
+ end
+
+ def subscribe(name, &block)
+ @client.subscribe(name, &block)
+ end
+
+ def now
+ DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
end
end
end
end