Class: CelluloidPubsub::Client::PubSubWorker

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger
Defined in:
lib/celluloid_pubsub/client_pubsub.rb

Overview

worker that subscribes to a channel or publishes to a channel if it used to subscribe to a channel the worker will dispatch the messages to the actor that made the connection in the first place.

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (void) initialize(options, &connect_blk)

receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to

when receiving messages from a channel

Parameters:

  • options (Hash)

    the options that can be used to connect to webser and send additional data

  • connect_blk (Proc)

    Block that will execute after the connection is opened

Options Hash (options):

  • :actor (String)

    The actor that made the connection

  • :hostname (String)

    The hostname on which the webserver runs on

  • :port (String)

    The port on which the webserver runs on

  • :path (String)

    The request path that the webserver accepts



47
48
49
50
51
52
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 47

def initialize(options, &connect_blk)
  parse_options(options)
  raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
  @connect_blk = connect_blk
  @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
end

Instance Attribute Details

- (Celluloid::Actor) actor

Returns actor to which callbacks will be delegated to

Returns:

  • (Celluloid::Actor)

    actor to which callbacks will be delegated to



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (Celluloid::WebSocket::Client) client

Returns A websocket client that is used to chat witht the webserver

Returns:

  • (Celluloid::WebSocket::Client)

    A websocket client that is used to chat witht the webserver



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (Proc) connect_blk

Returns Block that will execute after the connection is opened

Returns:

  • (Proc)

    Block that will execute after the connection is opened



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (String) hostname

Returns The hostname on which the webserver runs on

Returns:

  • (String)

    The hostname on which the webserver runs on



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (Hash) options

Returns the options that can be used to connect to webser and send additional data

Returns:

  • (Hash)

    the options that can be used to connect to webser and send additional data



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (String) path

Returns The hostname on which the webserver runs on

Returns:

  • (String)

    The hostname on which the webserver runs on



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

- (String) port

Returns The port on which the webserver runs on

Returns:

  • (String)

    The port on which the webserver runs on



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 28

class PubSubWorker
  include Celluloid
  include Celluloid::Logger
  attr_accessor :actor, :connect_blk, :client, :options, :hostname, :port, :path

  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @param [Proc] connect_blk  Block  that will execute after the connection is opened
  #
  # @return [void]
  #
  # @api public
  def initialize(options, &connect_blk)
    parse_options(options)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    @connect_blk = connect_blk
    @client = Celluloid::WebSocket::Client.new("ws://#{@hostname}:#{@port}#{@path}", Actor.current)
  end

  # check the options list for values and sets default values if not found
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def parse_options(options)
    raise 'Options is not a hash' unless options.is_a?(Hash)
    @options = options.stringify_keys!
    @actor = @options.fetch('actor', nil)
    @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
    @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
    @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    CelluloidPubsub::WebServer.debug_enabled?
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel)
    subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
    debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
    async.chat(subscription_data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
    debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
    async.chat(publishing_data)
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    debug("#{self.class} websocket connection opened") if debug_enabled?
    @connect_blk.call Actor.current
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    debug("#{self.class} received  plain #{data}") if debug_enabled?
    message = JSON.parse(data)
    debug("#{self.class} received JSON  #{message}") if debug_enabled?
    @actor.async.on_message(message)
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    @client.terminate
    terminate
    debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
    @actor.async.on_close(code, reason)
  end

private

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      debug("#{self.class} sends #{message.to_json}") if debug_enabled?
      final_message = message.to_json
    else
      text_messsage = JSON.dump(action: 'message', message: message)
      debug("#{self.class} sends JSON  #{text_messsage}") if debug_enabled?
      final_message = text_messsage
    end
    @client.text final_message
  end
end

Instance Method Details

- (boolean) debug_enabled?

checks if debug is enabled

Returns:

  • (boolean)


80
81
82
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 80

def debug_enabled?
  CelluloidPubsub::WebServer.debug_enabled?
end

- (void) on_close(code, reason)

This method returns an undefined value.

callback executes when connection closes

Parameters:

  • code (String)
  • reason (String)


146
147
148
149
150
151
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 146

def on_close(code, reason)
  @client.terminate
  terminate
  debug("#{self.class} dispatching on close  #{code} #{reason}") if debug_enabled?
  @actor.async.on_close(code, reason)
end

- (void) on_message(data)

This method returns an undefined value.

callback executes when actor receives a message from a subscribed channel and parses the message using JSON.parse and dispatches the parsed message to the original actor that made the connection

Parameters:

  • data (JSON)


130
131
132
133
134
135
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 130

def on_message(data)
  debug("#{self.class} received  plain #{data}") if debug_enabled?
  message = JSON.parse(data)
  debug("#{self.class} received JSON  #{message}") if debug_enabled?
  @actor.async.on_message(message)
end

- (void) on_open

This method returns an undefined value.

callback executes after connection is opened and delegates action to actor



116
117
118
119
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 116

def on_open
  debug("#{self.class} websocket connection opened") if debug_enabled?
  @connect_blk.call Actor.current
end

- (void) parse_options(options)

This method returns an undefined value.

check the options list for values and sets default values if not found

Parameters:

  • options (Hash)

    the options that can be used to connect to webser and send additional data

Options Hash (options):

  • :actor (String)

    The actor that made the connection

  • :hostname (String)

    The hostname on which the webserver runs on

  • :port (String)

    The port on which the webserver runs on

  • :path (String)

    The request path that the webserver accepts



65
66
67
68
69
70
71
72
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 65

def parse_options(options)
  raise 'Options is not a hash' unless options.is_a?(Hash)
  @options = options.stringify_keys!
  @actor = @options.fetch('actor', nil)
  @hostname = @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  @port = @options.fetch('port', CelluloidPubsub::WebServer::PORT)
  @path = @options.fetch('path', CelluloidPubsub::WebServer::PATH)
end

- (void) publish(channel, data)

This method returns an undefined value.

publishes to a channel some data (can be anything)

Parameters:

  • channel (string)
  • data (#to_s)


105
106
107
108
109
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 105

def publish(channel, data)
  publishing_data = { 'client_action' => 'publish', 'channel' => channel, 'data' => data }
  debug(" #{self.class}  publishl to #{channel} message:  #{publishing_data}") if debug_enabled?
  async.chat(publishing_data)
end

- (void) subscribe(channel)

This method returns an undefined value.

subscribes to a channel . need to be used inside the connect block passed to the actor

Parameters:

  • channel (string)


91
92
93
94
95
# File 'lib/celluloid_pubsub/client_pubsub.rb', line 91

def subscribe(channel)
  subscription_data = { 'client_action' => 'subscribe', 'channel' => channel }
  debug("#{self.class} tries to subscribe  #{subscription_data}") if debug_enabled?
  async.chat(subscription_data)
end