# = Trisul Remote Protocol functions
#
# dependency = ruby_protobuf
#
# Akhil.M & Dhinesh.K (c) 2010 Unleash Networks
require 'openssl'
require 'socket'
require 'time'
require 'bigdecimal'
begin
FFI_RZMQ_AVAIL=true
require 'ffi-rzmq'
rescue
FFI_RZMQ_AVAIL=false
end
# ==== TrisulRP::Protocol
# Contains methods to help with common TRP tasks like
# * creating connections
# * interacting with Trisul via requests/responses
# * helpers to create objects
#
#
module TrisulRP::Protocol
include TrisulRP::Guids
# Establish a TLS connection to a Trisul instance
#
# [server] IP Address or hostname
# [port] TRP port, typically 12001 (see trisulConfig.xml)
# [client_cert_file] Client certificate file issued by admin
# [client_key_file] Client key file issued by admin
#
#
# ==== Returns
# ==== Yields
# a connection object that can be used in subsequent calls
#
# ==== On error
# If a connection cannot be established, an exception is thrown which can point
# to the actual cause. The most common causes are
#
# * Trisul is not running
# * Trisul is not running in trp mode (see the docs for runmode)
# * Using the wrong port ( check netstat to verify trisul remote protocol port - typically 12001)
# * The Access Control List does not permit connections from client IP
# * The server certificate expired
# * The client certificate expired
# * The client does not have permissions to connect with that cert
# * The private key password is wrong
#
def connect(server,port,client_cert_file,client_key_file)
tcp_sock=TCPSocket.open(server,port)
ctx = OpenSSL::SSL::SSLContext.new(:TLSv1)
ctx.cert = OpenSSL::X509::Certificate.new(File.read(client_cert_file))
ctx.key = OpenSSL::PKey::RSA.new(File.read(client_key_file))
ssl_sock = OpenSSL::SSL::SSLSocket.new(tcp_sock, ctx)
ssl_sock.connect
yield ssl_sock if block_given?
return ssl_sock
end
# Establish a *NONSECURE PLAINTEXT* connection to a Trisul instance
#
# We highly recommend you to use the TLS connect(..) method, but if
# you must use plaintext use this version. This is purposely named
# connect_nonsecure(..) to drive home the point that there is no
# authentication using client certs, or no encryption.
#
# You can still use the ACL (Access Control List) to control who connects
#
#
# [server] IP Address or hostname
# [port] TRP port, typically 12001 (see trisulConfig.xml)
#
#
# ==== Returns
# ==== Yields
# a connection object that can be used in subsequent calls
#
# ==== On error
# If a connection cannot be established, an exception is thrown which can point
# to the actual cause. The most common causes are
#
# * Trisul is not running
# * Trisul is not running in trp mode (see the docs for runmode)
# * Using the wrong port ( check netstat to verify trisul remote protocol port - typically 12001)
# * The Access Control List does not permit connections from client IP
#
def connect_nonsecure(server,port)
tcp_sock=TCPSocket.open(server,port)
yield tcp_sock if block_given?
return tcp_sock
end
# Dispatch request to server & get response
# [conn] TRP connection previously opened via TrisulRP::Protocol::connect
#
# - connection can a ZMQ endpoint string like zmq:ipc://.. if string starts
# with 'zmq:' it will be treated as ZMQ.
#
# Needs ffi-rzmq gem
#
# [trp_request] a TRP request object, created directly or using the mk_request helper
#
# ==== Returns
# ==== Yields
# a response object, you can then inspect the fields in the response and spawn additional
# requests if required
#
# ==== On error
# raises an error if the server returns an ErrorResponse - this contains an error_message field
# which can tell you what went wrong
#
# Using ZMQ to Dispatch request to server & get response
#
# This is used by new webtrisul architecuter to connect to trisul_queryd
#
# [conn] - a ZMQ endpoint string like ipc://..
#
# Needs ffi-rzmq gem
#
# [trp_request] a TRP request object, created directly or using the mk_request helper
#
# ==== Returns
# ==== Yields
# a response object, you can then inspect the fields in the response and spawn additional
# requests if required
#
# ==== On error
# raises an error if the server returns an ErrorResponse - this contains an error_message field
# which can tell you what went wrong
#
def get_response_zmq(endpoint, trp_request, timeout_seconds = -1 )
outbuf=""
# out
outbuf=trp_request.encode
ctx=ZMQ::Context.new
sock = ctx.socket(ZMQ::REQ)
# time out for context termination
sock.setsockopt(ZMQ::LINGER, 5*1_000)
# Initialize a poll set
poller = ZMQ::Poller.new
poller.register(sock, ZMQ::POLLIN)
sock.connect(endpoint)
sock.send_string(outbuf)
ret = poller.poll(timeout_seconds * 1_000 )
if ret == -1
sock.close
ctx.terminate
raise "zeromq poll error #{endpoint} "
end
if ret == 0
sock.close
ctx.terminate
raise "no registered sockets #{endpoint} "
end
poller.readables.each do |rsock|
#in
dataarray=""
rsock.recv_string(dataarray)
resp =TRP::Message.new
resp.decode dataarray
if resp.trp_command.to_i == TRP::Message::Command::ERROR_RESPONSE
print "TRP ErrorResponse: #{resp.error_response.error_message}\n"
rsock.close
ctx.terminate
raise resp.error_response.error_message
end
rsock.close
ctx.terminate
unwrap_resp = unwrap_response(resp)
unwrap_resp.instance_variable_set("@trp_resp_command_id",resp.trp_command.to_i)
yield unwrap_resp if block_given?
return unwrap_resp
end
end
# returns response str
def get_response_zmq_raw(endpoint, trp_request_buf , timeout_seconds = -1 )
ctx=ZMQ::Context.new
sock = ctx.socket(ZMQ::REQ)
# time out for context termination
sock.setsockopt(ZMQ::LINGER, 5*1_000)
# Initialize a poll set
poller = ZMQ::Poller.new
poller.register(sock, ZMQ::POLLIN)
sock.connect(endpoint)
sock.send_string(trp_request_buf)
ret = poller.poll(timeout_seconds * 1_000 )
if ret == -1
sock.close
ctx.terminate
raise "zeromq poll error #{endpoint} "
end
if ret == 0
sock.close
ctx.terminate
raise "no registered sockets #{endpoint} "
end
poller.readables.each do |rsock|
#in
dataarray=""
rsock.recv_string(dataarray)
rsock.close
ctx.terminate
return dataarray
end
end
# used in Trisul Domain
# send trp_request as async, then poll for completion and return
# this does not block the domain network
#
def get_response_zmq_async(endpoint, trp_request, timeout_seconds = -1 )
# first get a resp.token ASYNC, then poll for it
trp_request.run_async=true
resp=get_response_zmq(endpoint, trp_request, timeout_seconds)
trp_resp_command_id = resp.instance_variable_get("@trp_resp_command_id")
sleep_time = [1,1,1,1,1,1,1,2,2,2,2,2]
count = 0
while trp_resp_command_id == TRP::Message::Command::ASYNC_RESPONSE do
@sleep = sleep_time[count] || 5
count = count +1
async_req = TrisulRP::Protocol.mk_request(
TRP::Message::Command::ASYNC_REQUEST,
{
token:resp.token,
destination_node:trp_request.destination_node,
}
)
#sleep before send async request
sleep(@sleep)
resp=get_response_zmq(endpoint,async_req, timeout_seconds)
trp_resp_command_id = resp.instance_variable_get("@trp_resp_command_id")
end
return resp
end
# Query the total time window available in Trisul
#
# [conn] TRP connection previously opened via connect
#
# ==== Returns
# returns an array of two Time objects [Time_from, Time_to] representing start and end time
#
# ==== Typical usage
#
# You pass the output of this method to mk_time_interval to get an object you can attach to a
# TRP request.
#
#
#
# tmarr = TrisulRP::Protocol::get_avaiable_time(conn)
# req = TrisulRP::Protocol.mk_request( :source_ip => target_ip,...
# :time_interval => TrisulRP::Protocol::mk_time_interval(tm_arr))
#
#
#
def get_available_time(conn,timeout=-1)
from_tm=to_tm=nil
req=mk_request(TRP::Message::Command::TIMESLICES_REQUEST,
:get_total_window => true )
begin
if conn.is_a?(String)
resp = get_response_zmq(conn,req,timeout)
else
resp = get_response(conn,req)
end
rescue Exception=>ex
raise ex
end
from_tm = Time.at(resp.total_window.from.tv_sec)
to_tm = Time.at(resp.total_window.to.tv_sec)
return [from_tm,to_tm]
end
# Helper to create a TRP TimeInterval object
#
# [tmarr] An array of two Time objects representing a window
#
# ==== Returns
# A TRP::TimeInterval object which can be attached to any :time_interval field of a TRP request
#
def mk_time_interval(tmarr)
tint=TRP::TimeInterval.new
if ( tmarr[0].is_a? Integer)
tint.from=TRP::Timestamp.new(:tv_sec => tmarr[0], :tv_usec => 0)
tint.to=TRP::Timestamp.new(:tv_sec => tmarr[1], :tv_usec => 0)
elsif (tmarr[0].is_a? Time)
tint.from=TRP::Timestamp.new(:tv_sec => tmarr[0].tv_sec, :tv_usec => 0)
tint.to=TRP::Timestamp.new(:tv_sec => tmarr[1].tv_sec, :tv_usec => 0)
end
return tint
end
# Helper iterate through each time interval in reverse order
#
# [endpoint] - the ZMQ endpoint
#
# ==== Returns
# ==== Yields
# [time_interval, id, total_intervals] A TRP::TimeInterval object which can be attached to any :time_interval field of a TRP request
#
#
def each_time_interval(zmq_endpt)
req=mk_request(TRP::Message::Command::TIMESLICES_REQUEST )
get_response_zmq(zmq_endpt,req) do |resp|
slice_id = 0
total_slices = resp.slices.size
resp.slices.reverse_each do | slc |
yield slc.time_interval, slice_id, total_slices
slice_id=slice_id+1
end
end
end
# Helper to allow assinging string to KeyT field
#
# instead of
# ..{ :source_ip => TRP::KeyT.new( :label => val ) }
#
# you can do
# ..{ :source_ip => val }
#
def fix_TRP_Fields(msg, params)
ti = params[:time_interval]
if ti.is_a? Array
params[:time_interval] = mk_time_interval(ti)
end
params.each do |k,v|
f = msg.get_field(k)
if v.is_a? String
if f.is_a? Protobuf::Field::MessageField and f.type_class.to_s == "TRP::KeyT"
params[k] = TRP::KeyT.new( :label => v )
elsif f.is_a? Protobuf::Field::Int64Field
params[k] = v.to_i
elsif f.is_a? Protobuf::Field::StringField and f.rule == :repeated
params[k] = v.split(',')
elsif f.is_a? Protobuf::Field::BoolField
params[k] = ( v == "true")
end
elsif v.is_a? BigDecimal or v.is_a? Float
if f.is_a? Protobuf::Field::Int64Field
params[k] = v.to_i
end
elsif v.is_a?Array and f.is_a?Protobuf::Field::MessageField and f.type_class.to_s == "TRP::KeyT"
v.each_with_index do |v1,idx|
v[idx]= TRP::KeyT.new( :label => v1 ) if v1.is_a?String
end
end
end
end
# Helper to create a TRP request object
#
# Read the TRP documentation wiki for a description of each command.
#
# [cmd_id] The command ID.
# [params] A hash containing command parameters
#
# ==== Typical usage
#
#
#
# # create a new command of type QuerySessionsRequest
# req = TrisulRP::Protocol.mk_request(TRP::Message::Command::QUERY_SESSIONS_REQUEST,
# :source_ip => .. ,
# :time_interval => mk_time_interval(tmarr))
#
# ... now you can use the req object ...
#
#
#
# You can also create the request objects directly, just a little too verbose for our liking
#
#
#
# # create a new command of type CounterItemRequest
# req =TRP::Message.new(:trp_command => TRP::Message::Command::QUERY_SESSIONS_REQUEST )
# req.query_sessions_request = TRP::QuerySessionsRequest.new(
# :source_ip => ... ,
# :time_interval => mk_time_interval(tmarr))
#
# ... now you can use the req object ...
#
#
#
#
def mk_request(cmd_id,in_params={})
params =in_params.dup
opts = {:trp_command=> cmd_id}
if params.has_key?(:destination_node)
opts[:destination_node] = params[:destination_node]
end
if params.has_key?(:probe_id)
opts[:probe_id] = params[:probe_id]
end
if params.has_key?(:run_async)
opts[:run_async] = params[:run_async]
end
req = TRP::Message.new(opts)
case cmd_id
when TRP::Message::Command::HELLO_REQUEST
fix_TRP_Fields( TRP::HelloRequest, params)
req.hello_request = TRP::HelloRequest.new(params)
when TRP::Message::Command::COUNTER_GROUP_TOPPER_REQUEST
fix_TRP_Fields( TRP::CounterGroupTopperRequest, params)
req.counter_group_topper_request = TRP::CounterGroupTopperRequest.new(params)
when TRP::Message::Command::COUNTER_ITEM_REQUEST
fix_TRP_Fields( TRP::CounterItemRequest, params)
req.counter_item_request = TRP::CounterItemRequest.new(params)
when TRP::Message::Command::PCAP_REQUEST
fix_TRP_Fields( TRP::PcapRequest, params)
req.pcap_request = TRP::PcapRequest.new(params)
when TRP::Message::Command::SEARCH_KEYS_REQUEST
fix_TRP_Fields( TRP::SearchKeysRequest, params)
req.search_keys_request = TRP::SearchKeysRequest.new(params)
when TRP::Message::Command::UPDATE_KEY_REQUEST
fix_TRP_Fields( TRP::UpdateKeyRequest, params)
req.update_key_request = TRP::UpdateKeyRequest.new(params)
when TRP::Message::Command::PROBE_STATS_REQUEST
fix_TRP_Fields( TRP::ProbeStatsRequest, params)
req.probe_stats_request = TRP::ProbeStatsRequest.new(params)
when TRP::Message::Command::QUERY_ALERTS_REQUEST
fix_TRP_Fields( TRP::QueryAlertsRequest, params)
req.query_alerts_request = TRP::QueryAlertsRequest.new(params)
when TRP::Message::Command::QUERY_RESOURCES_REQUEST
fix_TRP_Fields( TRP::QueryResourcesRequest, params)
req.query_resources_request = TRP::QueryResourcesRequest.new(params)
when TRP::Message::Command::COUNTER_GROUP_INFO_REQUEST
fix_TRP_Fields( TRP::CounterGroupInfoRequest, params)
req.counter_group_info_request = TRP::CounterGroupInfoRequest.new(params)
when TRP::Message::Command::SESSION_TRACKER_REQUEST
fix_TRP_Fields( TRP::SessionTrackerRequest, params)
req.session_tracker_request = TRP::SessionTrackerRequest.new(params)
when TRP::Message::Command::QUERY_SESSIONS_REQUEST
fix_TRP_Fields( TRP::QuerySessionsRequest, params)
req.query_sessions_request = TRP::QuerySessionsRequest.new(params)
when TRP::Message::Command::AGGREGATE_SESSIONS_REQUEST
fix_TRP_Fields( TRP::AggregateSessionsRequest, params)
req.aggregate_sessions_request = TRP::AggregateSessionsRequest.new(params)
when TRP::Message::Command::GREP_REQUEST
fix_TRP_Fields( TRP::GrepRequest, params)
req.grep_request = TRP::GrepRequest.new(params)
when TRP::Message::Command::KEYSPACE_REQUEST
fix_TRP_Fields( TRP::KeySpaceRequest, params)
req.key_space_request = TRP::KeySpaceRequest.new(params)
when TRP::Message::Command::TOPPER_TREND_REQUEST
fix_TRP_Fields( TRP::TopperTrendRequest, params)
req.topper_trend_request = TRP::TopperTrendRequest.new(params)
when TRP::Message::Command::STAB_PUBSUB_CTL
fix_TRP_Fields( TRP::SubscribeCtl, params)
req.subscribe_ctl = TRP::SubscribeCtl.new(params)
when TRP::Message::Command::TIMESLICES_REQUEST
fix_TRP_Fields( TRP::TimeSlicesRequest, params)
req.time_slices_request = TRP::TimeSlicesRequest.new(params)
when TRP::Message::Command::DELETE_ALERTS_REQUEST
fix_TRP_Fields( TRP::DeleteAlertsRequest, params)
req.delete_alerts_request = TRP::DeleteAlertsRequest.new(params)
when TRP::Message::Command::QUERY_FTS_REQUEST
fix_TRP_Fields( TRP::QueryFTSRequest, params)
req.query_fts_request = TRP::QueryFTSRequest.new(params)
when TRP::Message::Command::METRICS_SUMMARY_REQUEST
fix_TRP_Fields( TRP::MetricsSummaryRequest, params)
req.metrics_summary_request = TRP::MetricsSummaryRequest.new(params)
when TRP::Message::Command::CONTEXT_INFO_REQUEST
fix_TRP_Fields( TRP::ContextInfoRequest, params)
req.context_info_request = TRP::ContextInfoRequest.new(params)
when TRP::Message::Command::CONTEXT_CONFIG_REQUEST
fix_TRP_Fields( TRP::ContextConfigRequest, params)
req.context_config_request = TRP::ContextConfigRequest.new(params)
when TRP::Message::Command::PCAP_SLICES_REQUEST
fix_TRP_Fields( TRP::PcapSlicesRequest, params)
req.pcap_slices_request = TRP::PcapSlicesRequest.new(params)
when TRP::Message::Command::LOG_REQUEST
fix_TRP_Fields( TRP::LogRequest, params)
req.log_request = TRP::LogRequest.new(params)
when TRP::Message::Command::CONTEXT_START_REQUEST
fix_TRP_Fields( TRP::ContextStartRequest, params)
req.context_start_request = TRP::ContextStartRequest.new(params)
when TRP::Message::Command::CONTEXT_STOP_REQUEST
fix_TRP_Fields( TRP::ContextStopRequest, params)
req.context_stop_request = TRP::ContextStopRequest.new(params)
when TRP::Message::Command::DOMAIN_REQUEST
fix_TRP_Fields( TRP::DomainRequest, params)
req.domain_request = TRP::DomainRequest.new(params)
when TRP::Message::Command::ASYNC_REQUEST
fix_TRP_Fields( TRP::AsyncRequest, params)
req.async_request = TRP::AsyncRequest.new(params)
when TRP::Message::Command::NODE_CONFIG_REQUEST
fix_TRP_Fields( TRP::NodeConfigRequest, params)
req.node_config_request = TRP::NodeConfigRequest.new(params)
when TRP::Message::Command::FILE_REQUEST
fix_TRP_Fields( TRP::FileRequest, params)
req.file_request = TRP::FileRequest.new(params)
when TRP::Message::Command::GRAPH_REQUEST
fix_TRP_Fields( TRP::GraphRequest, params)
req.graph_request = TRP::GraphRequest.new(params)
when TRP::Message::Command::RUNTOOL_REQUEST
fix_TRP_Fields( TRP::RunToolRequest, params)
req.run_tool_request = TRP::RunToolRequest.new(params)
when TRP::Message::Command::TOOL_INFO_REQUEST
fix_TRP_Fields( TRP::ToolInfoRequest, params)
req.tool_info_request = TRP::ToolInfoRequest.new(params)
else
raise "Unknown TRP command ID"
end
return req
end
# Helper to unwrap a response
#
# All protobuf messages used in TRP have a wrapper containing a command_id which identifies
# the type of encapsulated message. This sometimes gets in the way because you have to write
# stuff like
#
#
#
# response.counter_group_response.blah_blah
#
# instead of
#
# response.blah_blah
#
#
#
# Read the TRP documentation wiki for a description of each command.
#
# [resp] The response
#
# ==== Typical usage
#
#
#
# # create a new command of type KeySessionActivityRequest
# req = TrisulRP::Protocol.get_response(...) do |resp|
#
# # here resp points to the inner response without the wrapper
# # this allows you to write resp.xyz instead of resp.hello_response.xyz
#
# end
#
#
#
#
def unwrap_response(resp)
case resp.trp_command.to_i
when TRP::Message::Command::HELLO_RESPONSE
resp.hello_response
when TRP::Message::Command::COUNTER_GROUP_TOPPER_RESPONSE
resp.counter_group_topper_response
when TRP::Message::Command::COUNTER_ITEM_RESPONSE
resp.counter_item_response
when TRP::Message::Command::OK_RESPONSE
resp.ok_response
when TRP::Message::Command::PCAP_RESPONSE
resp.pcap_response
when TRP::Message::Command::SEARCH_KEYS_RESPONSE
resp.search_keys_response
when TRP::Message::Command::UPDATE_KEY_RESPONSE
resp.update_key_response
when TRP::Message::Command::PROBE_STATS_RESPONSE
resp.probe_stats_response
when TRP::Message::Command::QUERY_ALERTS_RESPONSE
resp.query_alerts_response
when TRP::Message::Command::QUERY_RESOURCES_RESPONSE
resp.query_resources_response
when TRP::Message::Command::COUNTER_GROUP_INFO_RESPONSE
resp.counter_group_info_response
when TRP::Message::Command::SESSION_TRACKER_RESPONSE
resp.session_tracker_response
when TRP::Message::Command::QUERY_SESSIONS_RESPONSE
resp.query_sessions_response
when TRP::Message::Command::AGGREGATE_SESSIONS_RESPONSE
resp.aggregate_sessions_response
when TRP::Message::Command::GREP_RESPONSE
resp.grep_response
when TRP::Message::Command::KEYSPACE_RESPONSE
resp.key_space_response
when TRP::Message::Command::TOPPER_TREND_RESPONSE
resp.topper_trend_response
when TRP::Message::Command::QUERY_FTS_RESPONSE
resp.query_fts_response
when TRP::Message::Command::TIMESLICES_RESPONSE
resp.time_slices_response
when TRP::Message::Command::METRICS_SUMMARY_RESPONSE
resp.metrics_summary_response
when TRP::Message::Command::CONTEXT_INFO_RESPONSE
resp.context_info_response
when TRP::Message::Command::CONTEXT_CONFIG_RESPONSE
resp.context_config_response
when TRP::Message::Command::LOG_RESPONSE
resp.log_response
when TRP::Message::Command::DOMAIN_RESPONSE
resp.domain_response
when TRP::Message::Command::NODE_CONFIG_RESPONSE
resp.node_config_response
when TRP::Message::Command::ASYNC_RESPONSE
resp.async_response
when TRP::Message::Command::FILE_RESPONSE
resp.file_response
when TRP::Message::Command::GRAPH_RESPONSE
resp.graph_response
when TRP::Message::Command::RUNTOOL_RESPONSE
resp.run_tool_response
when TRP::Message::Command::TOOL_INFO_RESPONSE
resp.tool_info_response
else
raise "#{resp.trp_command.to_i} Unknown TRP command ID"
end
end
end