require 'json' module CPEE module Notifications def self::implementation(id,opts) Proc.new do on resource "notifications" do run CPEE::Notifications::Overview if get on resource "topics" do run CPEE::Notifications::Topics, opts if get end on resource "subscriptions" do run CPEE::Notifications::Subscriptions, id, opts if get run CPEE::Notifications::CreateSubscription, id, opts if post 'subscribe' on resource do run CPEE::Notifications::Subscription, id, opts if get run CPEE::Notifications::UpdateSubscription, id, opts if put 'subscribe' run CPEE::Notifications::DeleteSubscription, id, opts if delete on resource 'sse' do run CPEE::Notifications::SSE, id, opts if sse end end end end end end class Overview < Riddl::Implementation #{{{ def response Riddl::Parameter::Complex.new("overview","text/xml") do <<-END END end end end #}}} class Topics < Riddl::Implementation #{{{ def response opts = @a[0] Riddl::Parameter::Complex.new("overview","text/xml") do File.read(opts[:topics]) end end end #}}} class Subscriptions < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END CPEE::Persistence::extract_handlers(id,opts).each do |de| ret.root.add('subscription').tap do |n| n.attributes['id'] = de[0] n.attributes['url'] = de[1] if de[1] && !de[1].empty? end end ret.to_s end end end #}}} class Subscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r[-1] if CPEE::Persistence::exists_handler?(id,opts,key) Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END url = CPEE::Persistence::extract_item(id,opts,File.join('handler',key,'url')) ret.root.attributes['url'] = url if url && !url.empty? items = {} CPEE::Persistence::extract_handler(id,opts,key).each do |h| t, i, v = h.split('/') items[t] ||= [] items[t] << [i,v] end items.each do |k,v| ret.root.add('topic').tap do |n| n.attributes['id'] = k v.each do |e| n.add *e end end end ret.to_s end else @status = 404 end end end #}}} class CreateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = Digest::MD5.hexdigest(Kernel::rand().to_s) url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values) Riddl::Parameter::Simple.new('key',key) end end #}}} class UpdateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r.last if CPEE::Persistence::exists_handler?(id,opts,key) url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values,true) else @status = 404 end end end #}}} class DeleteSubscription < Riddl::Implementation #{{{ def self::set(id,opts,key) CPEE::Persistence::set_handler(id,opts,key,"",[],true) end def response id = @a[0] opts = @a[1] key = @r.last if CPEE::Persistence::exists_handler?(id,opts,key) DeleteSubscription::set(id,opts,key) else @status = 404 end nil end end #}}} def self::sse_distributor(opts) #{{{ conn = opts[:redis_dyn].call conn.psubscribe('forward:*','event:state/change') do |on| on.pmessage do |pat, what, message| if pat == 'forward:*' id, key = what.match(/forward:([^\/]+)\/(.+)/).captures if sse = opts.dig(:sse_connections,id.to_i,key) sse.send message else DeleteSubscription::set(id,opts,key) end elsif pat == 'event:state/change' mess = JSON.parse(message[message.index(' ')+1..-1]) state = mess.dig('content','state') if state == 'finished' || state == 'abandoned' opts.dig(:sse_connections,mess.dig('instance').to_i)&.each do |key,sse| EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure) sse.close end end end end end end conn.close end #}}} def self::sse_heartbeat(opts) #{{{ opts.dig(:sse_connections).each do |id,keys| keys.each do |key,sse| sse.send_with_id('heartbeat', '42') unless sse&.closed? end end end #}}} class SSE < Riddl::SSEImplementation #{{{ def onopen @opts = @a[1] @id = @a[0] @key = @r[-2] if CPEE::Persistence::exists_handler?(@id,@opts,@key) @opts[:sse_connections][@id] ||= {} @opts[:sse_connections][@id][@key] = self true else false end end def onclose @opts.dig(:sse_connections,@id)&.delete(@key) @opts.dig(:sse_connections)&.delete(@id) if @opts.dig(:sse_connections,@id)&.length == 0 DeleteSubscription::set(@id,@opts,@key) end end #}}} end end