require 'json' module CPEE module Notifications def self::implementation(id,opts) Proc.new do on resource "notifications" do run CPEE::Notifications::Overview if get on resource "topics" do run CPEE::Notifications::Topics, opts if get end on resource "subscriptions" do run CPEE::Notifications::Subscriptions, id, opts if get run CPEE::Notifications::CreateSubscription, id, opts if post 'subscribe' on resource do run CPEE::Notifications::Subscription, id, opts if get run CPEE::Notifications::UpdateSubscription, id, opts if put 'subscribe' run CPEE::Notifications::DeleteSubscription, id, opts if delete on resource 'sse' do run CPEE::Notifications::SSE, id, opts if sse end end end end end end class Overview < Riddl::Implementation #{{{ def response Riddl::Parameter::Complex.new("overview","text/xml") do <<-END END end end end #}}} class Topics < Riddl::Implementation #{{{ def response opts = @a[0] Riddl::Parameter::Complex.new("overview","text/xml") do File.read(opts[:topics]) end end end #}}} class Subscriptions < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END CPEE::Persistence::extract_handlers(id,opts).each do |de| ret.root.add('subscription').tap do |n| n.attributes['id'] = de[0] n.attributes['url'] = de[1] if de[1] && !de[1].empty? end end ret.to_s end end end #}}} class Subscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r[-1] Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END url = CPEE::Persistence::extract_item(id,opts,File.join('handler',key,'url')) ret.root.attributes['url'] = url if url && !url.empty? items = {} CPEE::Persistence::extract_handler(id,opts,key).each do |h| t, i, v = h.split('/') items[t] ||= [] items[t] << [i,v] end items.each do |k,v| ret.root.add('topic').tap do |n| n.attributes['id'] = k v.each do |e| n.add *e end end end ret.to_s end end end #}}} class CreateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = Digest::MD5.hexdigest(Kernel::rand().to_s) url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values) Riddl::Parameter::Simple.new('key',key) end end #}}} class UpdateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r.last url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values,true) end end #}}} class DeleteSubscription < Riddl::Implementation #{{{ def self::set(id,opts,key) CPEE::Persistence::set_handler(id,opts,key,"",[],true) end def response id = @a[0] opts = @a[1] key = @r.last DeleteSubscription::set(id,opts,key) nil end end #}}} class SSE < Riddl::SSEImplementation #{{{ def onopen @id = @a[0] @opts = @a[1] @key = @r[-2] @conn = Redis.new(path: @opts[:redis_path], db: @opts[:redis_db]) EM.defer do @conn.subscribe("forward:#{@id}/#{@key}", "forward-end:#{@id}/#{@key}") do |on| on.message do |what, message| if what == "forward-end:#{@id}/#{@key}" @conn.unsubscribe else send message end end end @conn.close end EM.defer do until closed? send_with_id 'hearbeat', '42' sleep 10 end end end def onclose tredis = Redis.new(path: @opts[:redis_path], db: @opts[:redis_db]) tredis.publish("forward-end:#{@id}/#{@key}",true) tredis.close DeleteSubscription::set(@id,@opts,@key) end end #}}} end end