server/routing/forward-events.rb in cpee-2.1.39 vs server/routing/forward-events.rb in cpee-2.1.41
- old
+ new
@@ -21,45 +21,52 @@
Daemonite.new do |opts|
opts[:runtime_opts] += [
["--url=URL", "-uURL", "Specify redis url", ->(p){ opts[:redis_url] = p }],
["--path=PATH", "-pPATH", "Specify redis path, e.g. /tmp/redis.sock", ->(p){ opts[:redis_path] = p }],
- ["--db=DB", "-dDB", "Specify redis db, e.g. 1", ->(p) { opts[:redis_db] = p.to_i }]
+ ["--db=DB", "-dDB", "Specify redis db, e.g. 1", ->(p) { opts[:redis_db] = p.to_i }],
+ ["--worker=NUM", "-wNUM", "Specify the worker id, e.g. 0", ->(p) { opts[:worker] = p.to_i }]
]
+ on setup do
+ opts[:worker] ||= 0
+ opts[:worker] = ('%02i' % opts[:worker]).freeze
+ opts[:pidfile] = File.basename(opts[:pidfile],'.pid') + '-' + opts[:worker].to_s + '.pid'
+ end
+
on startup do
opts[:redis_path] ||= '/tmp/redis.sock'
opts[:redis_db] ||= 1
-
CPEE::redis_connect opts, 'Server Routing Forward Events'
opts[:pubsubredis] = opts[:redis_dyn].call 'Server Routing Forward Events Sub'
end
run do
- opts[:pubsubredis].psubscribe('event:*') do |on|
+ opts[:pubsubredis].psubscribe("event:#{opts[:worker]}:*") do |on|
on.pmessage do |pat, what, message|
index = message.index(' ')
mess = message[index+1..-1]
instance = message[0...index]
- type = pat[0..-3]
- event = what[(type.length+1)..-1]
+ type, worker, event = what.split(':',3)
topic = ::File::dirname(event)
name = ::File::basename(event)
long = File.join(topic,type,name)
opts[:redis].smembers("instance:#{instance}/handlers").each do |key|
if opts[:redis].smembers("instance:#{instance}/handlers/#{key}").include? long
url = opts[:redis].get("instance:#{instance}/handlers/#{key}/url")
if url.nil? || url == ""
opts[:redis].publish("forward:#{instance}/#{key}",mess)
else
- p "#{type}/#{topic}/#{event}-#{url}"
- client = Riddl::Client.new(url)
- client.post [
- Riddl::Parameter::Simple::new('type',type),
- Riddl::Parameter::Simple::new('topic',topic),
- Riddl::Parameter::Simple::new('event',name),
- Riddl::Parameter::Complex::new('notification','application/json',mess)
- ]
+ # Ractor.new(url,type,topic,name,mess) do |url,type,topic,name,mess|
+ # sadly typhoes does not support ractors
+ Thread.new do
+ Riddl::Client.new(url).post [
+ Riddl::Parameter::Simple::new('type',type),
+ Riddl::Parameter::Simple::new('topic',topic),
+ Riddl::Parameter::Simple::new('event',name),
+ Riddl::Parameter::Complex::new('notification','application/json',mess)
+ ]
+ end
end
end
end
unless opts[:redis].exists?("instance:#{instance}/state")
empt = opts[:redis].keys("instance:#{instance}/*").to_a