server/routing/forward-events.rb in cpee-2.1.39 vs server/routing/forward-events.rb in cpee-2.1.41

- old
+ new

@@ -21,45 +21,52 @@ Daemonite.new do |opts| opts[:runtime_opts] += [ ["--url=URL", "-uURL", "Specify redis url", ->(p){ opts[:redis_url] = p }], ["--path=PATH", "-pPATH", "Specify redis path, e.g. /tmp/redis.sock", ->(p){ opts[:redis_path] = p }], - ["--db=DB", "-dDB", "Specify redis db, e.g. 1", ->(p) { opts[:redis_db] = p.to_i }] + ["--db=DB", "-dDB", "Specify redis db, e.g. 1", ->(p) { opts[:redis_db] = p.to_i }], + ["--worker=NUM", "-wNUM", "Specify the worker id, e.g. 0", ->(p) { opts[:worker] = p.to_i }] ] + on setup do + opts[:worker] ||= 0 + opts[:worker] = ('%02i' % opts[:worker]).freeze + opts[:pidfile] = File.basename(opts[:pidfile],'.pid') + '-' + opts[:worker].to_s + '.pid' + end + on startup do opts[:redis_path] ||= '/tmp/redis.sock' opts[:redis_db] ||= 1 - CPEE::redis_connect opts, 'Server Routing Forward Events' opts[:pubsubredis] = opts[:redis_dyn].call 'Server Routing Forward Events Sub' end run do - opts[:pubsubredis].psubscribe('event:*') do |on| + opts[:pubsubredis].psubscribe("event:#{opts[:worker]}:*") do |on| on.pmessage do |pat, what, message| index = message.index(' ') mess = message[index+1..-1] instance = message[0...index] - type = pat[0..-3] - event = what[(type.length+1)..-1] + type, worker, event = what.split(':',3) topic = ::File::dirname(event) name = ::File::basename(event) long = File.join(topic,type,name) opts[:redis].smembers("instance:#{instance}/handlers").each do |key| if opts[:redis].smembers("instance:#{instance}/handlers/#{key}").include? long url = opts[:redis].get("instance:#{instance}/handlers/#{key}/url") if url.nil? || url == "" opts[:redis].publish("forward:#{instance}/#{key}",mess) else - p "#{type}/#{topic}/#{event}-#{url}" - client = Riddl::Client.new(url) - client.post [ - Riddl::Parameter::Simple::new('type',type), - Riddl::Parameter::Simple::new('topic',topic), - Riddl::Parameter::Simple::new('event',name), - Riddl::Parameter::Complex::new('notification','application/json',mess) - ] + # Ractor.new(url,type,topic,name,mess) do |url,type,topic,name,mess| + # sadly typhoes does not support ractors + Thread.new do + Riddl::Client.new(url).post [ + Riddl::Parameter::Simple::new('type',type), + Riddl::Parameter::Simple::new('topic',topic), + Riddl::Parameter::Simple::new('event',name), + Riddl::Parameter::Complex::new('notification','application/json',mess) + ] + end end end end unless opts[:redis].exists?("instance:#{instance}/state") empt = opts[:redis].keys("instance:#{instance}/*").to_a