Class: CelluloidPubsub::Client::PubSubWorker
- Inherits:
-
Object
- Object
- CelluloidPubsub::Client::PubSubWorker
- Includes:
- Celluloid, Celluloid::Logger
- Defined in:
- lib/celluloid_pubsub/client_pubsub.rb
Overview
worker that subscribes to a channel or publishes to a channel if it used to subscribe to a channel the worker will dispatch the messages to the actor that made the connection in the first place.
Instance Attribute Summary (collapse)
-
- (Celluloid::Actor) actor
Actor to which callbacks will be delegated to.
-
- (Celluloid::WebSocket::Client) client
A websocket client that is used to chat witht the webserver.
-
- (Proc) connect_blk
Block that will execute after the connection is opened.
-
- (String) hostname
The hostname on which the webserver runs on.
-
- (Hash) options
The options that can be used to connect to webser and send additional data.
-
- (String) path
The hostname on which the webserver runs on.
-
- (String) port
The port on which the webserver runs on.
Instance Method Summary (collapse)
-
- (boolean) debug_enabled?
checks if debug is enabled.
-
- (void) initialize(options, &connect_blk)
constructor
receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to when receiving messages from a channel.
-
- (void) on_close(code, reason)
callback executes when connection closes.
-
- (void) on_message(data)
callback executes when actor receives a message from a subscribed channel and parses the message using JSON.parse and dispatches the parsed message to the original actor that made the connection.
-
- (void) on_open
callback executes after connection is opened and delegates action to actor.
-
- (void) parse_options(options)
check the options list for values and sets default values if not found.
-
- (void) publish(channel, data)
publishes to a channel some data (can be anything).
-
- (void) subscribe(channel)
subscribes to a channel .
Constructor Details
- (void) initialize(options, &connect_blk)
receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
when receiving messages from a channel
47 48 49 50 51 52 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 47 def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end |
Instance Attribute Details
- (Celluloid::Actor) actor
Returns actor to which callbacks will be delegated to
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (Celluloid::WebSocket::Client) client
Returns A websocket client that is used to chat witht the webserver
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (Proc) connect_blk
Returns Block that will execute after the connection is opened
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (String) hostname
Returns The hostname on which the webserver runs on
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (Hash) options
Returns the options that can be used to connect to webser and send additional data
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (String) path
Returns The hostname on which the webserver runs on
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
- (String) port
Returns The port on which the webserver runs on
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28 class PubSubWorker include Celluloid include Celluloid::Logger attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path # receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to # when receiving messages from a channel # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @param [Proc] connect_blk Block that will execute after the connection is opened # # @return [void] # # @api public def initialize(, &connect_blk) () raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank? @connect_blk = connect_blk @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current) end # check the options list for values and sets default values if not found # # @param [Hash] options the options that can be used to connect to webser and send additional data # @option options [String] :actor The actor that made the connection # @option options [String]:hostname The hostname on which the webserver runs on # @option options [String] :port The port on which the webserver runs on # @option options [String] :path The request path that the webserver accepts # # @return [void] # # @api public def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end # checks if debug is enabled # # # @return [boolean] # # @api public def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end # subscribes to a channel . need to be used inside the connect block passed to the actor # # @param [string] channel # # @return [void] # # @api public def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end # publishes to a channel some data (can be anything) # # @param [string] channel # @param [#to_s] data # # @return [void] # # @api public def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end # callback executes after connection is opened and delegates action to actor # # @return [void] # # @api public def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end # callback executes when actor receives a message from a subscribed channel # and parses the message using JSON.parse and dispatches the parsed # message to the original actor that made the connection # # @param [JSON] data # # @return [void] # # @api public def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end # callback executes when connection closes # # @param [String] code # # @param [String] reason # # @return [void] # # @api public def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end private # method used to send messages to the webserver # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value # # @param [Hash] message # # @return [void] # # @api private def chat() = nil if .is_a?(Hash) debug("#{self.class} sends #{.to_json}") if debug_enabled? = .to_json else text_messsage = JSON.dump(action: 'message', message: ) debug("#{self.class} sends JSON #{text_messsage}") if debug_enabled? = text_messsage end @client.text end end |
Instance Method Details
- (boolean) debug_enabled?
checks if debug is enabled
80 81 82 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 80 def debug_enabled? CelluloidPubsub::WebServer.debug_enabled? end |
- (void) on_close(code, reason)
This method returns an undefined value.
callback executes when connection closes
146 147 148 149 150 151 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 146 def on_close(code, reason) @client.terminate terminate debug("#{self.class} dispatching on close #{code} #{reason}") if debug_enabled? @actor.async.on_close(code, reason) end |
- (void) on_message(data)
This method returns an undefined value.
callback executes when actor receives a message from a subscribed channel and parses the message using JSON.parse and dispatches the parsed message to the original actor that made the connection
130 131 132 133 134 135 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 130 def (data) debug("#{self.class} received plain #{data}") if debug_enabled? = JSON.parse(data) debug("#{self.class} received JSON #{}") if debug_enabled? @actor.async.() end |
- (void) on_open
This method returns an undefined value.
callback executes after connection is opened and delegates action to actor
116 117 118 119 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 116 def on_open debug("#{self.class} websocket connection opened") if debug_enabled? @connect_blk.call Actor.current end |
- (void) parse_options(options)
This method returns an undefined value.
check the options list for values and sets default values if not found
65 66 67 68 69 70 71 72 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 65 def () raise 'Options is not a hash' unless .is_a?(Hash) @options = .stringify_keys! @actor = @options.fetch('actor', nil) @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST) @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT) @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH) end |
- (void) publish(channel, data)
This method returns an undefined value.
publishes to a channel some data (can be anything)
105 106 107 108 109 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 105 def publish(channel, data) publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data } debug(" #{self.class} publishl to #{channel} message: #{publishing_data}") if debug_enabled? async.chat(publishing_data) end |
- (void) subscribe(channel)
This method returns an undefined value.
subscribes to a channel . need to be used inside the connect block passed to the actor
91 92 93 94 95 |
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 91 def subscribe(channel) subscription_data = { 'client_action' => 'subscribe', 'channel' => channel } debug("#{self.class} tries to subscribe #{subscription_data}") if debug_enabled? async.chat(subscription_data) end |