lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.4 vs lib/fluent/plugin/in_sforce.rb in fluent-plugin-sforce-0.0.5
- old
+ new
@@ -3,114 +3,126 @@
require 'nokogiri'
require 'restforce'
require 'date'
require 'net/http'
require 'faye'
+require 'fluent/plugin/input'
module Fluent
- class SforceInput < Input
- unless method_defined?(:log)
- define_method("log") { $log }
- end
+ module Plugin
+ class SforceInput < Input
+ class SforceConnectionError < StandardError; end
- Plugin.register_input('sforce', self)
+ Fluent::Plugin.register_input('sforce', self)
- config_param :query, :string, :default => "SELECT id, Body, CreatedById FROM FeedItem"
- config_param :tag, :string, :default => "sforce"
- config_param :polling_interval, :integer, :default => 60
- config_param :topic, :string, :default => nil
- config_param :username, :string
- config_param :password, :string
+ config_param :query, :string, default: 'SELECT id, Body, CreatedById FROM FeedItem'
+ config_param :tag, :string, default: 'sforce'
+ config_param :polling_interval, :integer, default: 60
+ config_param :topic, :string, default: nil
+ config_param :username, :string
+ config_param :password, :string
+ config_param :version, :string, default: '43.0'
+ config_param :login_endpoint, :string, default: 'login.salesforce.com'
- def configure(conf)
- super
- end
+ def configure(conf)
+ super
+ end
- def start
- super
- login_info = login()
- client = Restforce.new :oauth_token => login_info["sessionId"],
- :instance_url => login_info["instanceUrl"]
+ def start
+ super
+ login_info = login
+ client = Restforce.new login_info.merge(api_version: @version)
- th_low = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
- # query
- if @topic == nil then
- sleep(@polling_interval)
- th_high = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
- loop do
- # create soql query string
- where = "CreatedDate <= #{th_high} AND CreatedDate > #{th_low}"
- soql = ""
- if @query =~ /^(.+)\s(where|WHERE)\s(.+)$/ then
- soql = "#{$1} WHERE #{where} AND #{$3}"
- elsif @query =~ /^(.+)$/ then
- soql = "#{$1} WHERE #{where}"
- end
+ th_low = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
+ # query
+ if @topic == nil then
+ sleep(@polling_interval)
+ th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
+ loop do
+ # create soql query string
+ where = "CreatedDate <= #{th_high} AND CreatedDate > #{th_low}"
+ soql = ''
+ if @query =~ /^(.+)\s(where|WHERE)\s(.+)$/ then
+ soql = "#{$1} WHERE #{where} AND #{$3}"
+ elsif @query =~ /^(.+)$/ then
+ soql = "#{$1} WHERE #{where}"
+ end
- begin
- log.info "query: #{soql}"
- records = client.query(soql)
- records.each do |record|
- Fluent::Engine.emit(@tag, Fluent::Engine.now, record)
+ begin
+ log.info "query: #{soql}"
+ records = client.query(soql)
+ records.each do |record|
+ router.emit(@tag, Fluent::Engine.now, record)
+ end
+ sleep(@polling_interval)
+ th_low = th_high
+ th_high = DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.000%Z')
+ rescue Restforce::UnauthorizedError => e
+ log.error e
+ # retry login
+ login_info = login
+ client = Restforce.new login_info.merge(api_version: @version)
end
- sleep(@polling_interval)
- th_low = th_high
- th_high = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
- rescue Restforce::UnauthorizedError => e
- log.error e
- # retry login
- login_info = login()
- client = Restforce.new :oauth_token => login_info["sessionId"],
- :instance_url => login_info["instanceUrl"]
end
- end
- # streaming api
- else
- EM.run do
- log.info "suscribe: #{@topic}"
- # Subscribe to the PushTopic.
- client.subscribe @topic do |message|
- Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
+ # streaming api
+ else
+ EM.run do
+ log.info "suscribe: #{@topic}"
+ # Subscribe to the PushTopic.
+ client.subscribe @topic do |message|
+ router.emit(@tag, Fluent::Engine.now, message)
+ end
end
end
+ rescue SforceConnectionError => e
+ log.error e.message
end
- end
- def shutdown
- end
+ def shutdown
+ super
+ end
- private
+ private
- def login
- uri = URI('https://login.salesforce.com/services/Soap/u/30.0')
- request = Net::HTTP::Post.new(uri.request_uri, initheader = {'Content-Type' =>'text/xml', 'SOAPAction' => '""'})
- request.body = <<"BODY"
+ def login
+ uri = URI(login_endpoint)
+ request = Net::HTTP::Post.new(uri.request_uri, initheader = {'Content-Type' =>'text/xml', 'SOAPAction' => "''"})
+ request.body = <<BODY
<?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
<env:Body>
<n1:login xmlns:n1="urn:partner.soap.sforce.com">
-<n1:username>#{@username}</n1:username>
-<n1:password>#{@password}</n1:password>
+<n1:username>#{@username.encode(xml: :text)}</n1:username>
+<n1:password>#{@password.encode(xml: :text)}</n1:password>
</n1:login>
</env:Body>
</env:Envelope>
BODY
- http = Net::HTTP.new(uri.host, uri.port)
- http.use_ssl = true
- http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ http = Net::HTTP.new(uri.host, uri.port)
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
- # request login call and parse login response.
- http.start do |h|
- response = h.request(request)
- doc = Nokogiri::XML(response.body)
- session_id = doc.css("sessionId").inner_text
- /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css("serverUrl").inner_text)
- instance_url = $1
- log.info "login is successful."
- {"sessionId" => session_id, "instanceUrl" => instance_url}
+ # request login call and parse login response.
+ http.start do |h|
+ response = h.request(request)
+ doc = Nokogiri::XML(response.body)
+ fault = doc.css('faultstring').inner_text
+ raise SforceConnectionError, fault unless fault.empty?
+
+ session_id = doc.css('sessionId').inner_text
+ /^(https:\/\/.+\.salesforce\.com)\//.match(doc.css('serverUrl').inner_text)
+ instance_url = $1
+ log.info "login is successful. instance_url = '#{instance_url}'"
+
+ {oauth_token: session_id, instance_url: instance_url}
+ end
+ end
+
+ def login_endpoint
+ "https://#{@login_endpoint}/services/Soap/u/#{@version}"
end
end
end
end