Sha256: 5e900f94ddea6d1b3e8a39684bff0abc01f0183158faa74e94993a049ed4183a
Contents?: true
Size: 1.96 KB
Versions: 2
Compression:
Stored size: 1.96 KB
Contents
# frozen_string_literal: true module Cotcube # top-level class documentation comment class DataProxy def initialize( outputhandler: OutputHandler.new( location: "/var/cotcube/log/dataproxy" ) ) @output = outputhandler @client = DataProxy.get_ib_client @mq = DataProxy.get_mq_client @ib = @client[:ib] raise 'Could not connect to IB' unless @ib raise 'Could not connect to RabbitMQ' if %i[ request_exch replies_exch request_queue ].map{|z| mq[z].nil? }.reduce(:|) @requests = {} @req_mon = Monitor.new @persistent = { ticks: {}, depth: {}, realtimebars: {} } @per_mon = Monitor.new @gc_thread = nil spawn_message_subscribers commserver_start recover gc_start end def shutdown puts "Shutting down dataproxy." commserver_stop gc_stop mq[:commands].close mq[:channel].close mq[:connection].close persistent.each do |type, items| items.each do |con_id, item| log "sending #{ CANCEL_TYPES[type.to_sym]} #{con_id} (for #{item[:contract]})" ib.send_message CANCEL_TYPES[type.to_sym], id: con_id end end sleep 1 gc sleep 1 ib.close puts "... done." end private attr_reader :client, :clients, :ib, :mq, :requests, :req_mon, :persistent, :per_mon, :gc_thread def recover get_mq(list: false)[:exchanges].keys.select{|z| z.split('_').size == 3 }.each do |exch| src, type, contract = exch.split('_') next unless src == 'dataproxy' next unless %w[ ticks depth realtimebars ].include? type.downcase puts "Found #{exch} to recover." subscribe_persistent( { contract: contract, exchange: exch }, type: type.to_sym ) end end def log(msg) @output.puts "#{DateTime.now.strftime('%Y%m%d-%H:%M:%S: ')}#{msg.to_s.scan(/.{1,120}/).join("\n" + ' ' * 20)}" end end end __END__
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
cotcube-dataproxy-0.1.2 | lib/cotcube-dataproxy/init.rb |
cotcube-dataproxy-0.1.1 | lib/cotcube-dataproxy/init.rb |