Class: EZMQ::Subscriber

Inherits:
Socket
  • Object
show all
Defined in:
lib/ezmq/subscribe.rb

Overview

Subscribe socket that listens for messages with an optional topic.

Instance Attribute Summary (collapse)

Attributes inherited from Socket

#context, #decode, #encode, #socket

Instance Method Summary (collapse)

Methods inherited from Socket

#bind, #connect, #send

Constructor Details

- (Publisher) initialize(**options)

Note:

The default behaviour is to output and messages received to STDOUT.

Creates a new Subscriber socket.

Parameters:

  • options (Hash)

    optional parameters.

Options Hash (**options):

  • topic (String)

    a topic to subscribe to.

See Also:



19
20
21
22
# File 'lib/ezmq/subscribe.rb', line 19

def initialize(**options)
  super :connect, ZMQ::SUB, options
  subscribe options[:topic] if options[:topic]
end

Instance Attribute Details

- (Object) action

Returns the value of attribute action



7
8
9
# File 'lib/ezmq/subscribe.rb', line 7

def action
  @action
end

Instance Method Details

- (void) listen {|message, topic| ... }

This method returns an undefined value.

By default, waits for a message and prints it to STDOUT.

Yields:

  • (message, topic)

    passes the message body and topic to the block.

Yield Parameters:

  • message (String)

    the message received.

  • topic (String)

    the topic of the message.



59
60
61
62
63
64
65
66
67
68
# File 'lib/ezmq/subscribe.rb', line 59

def listen
  loop do
    if block_given?
      yield(*receive)
    else
      message, topic = receive
      puts "#{ topic } #{ message }"
    end
  end
end

- (Object) receive(**options) {|message, topic| ... }

Note:

This method blocks until a message arrives.

Receive a message from the socket.

Parameters:

  • options (Hash)

    optional parameters.

Options Hash (**options):

  • decode (lambda)

    how to decode the message.

Yields:

  • (message, topic)

    passes the message body and topic to the block.

Yield Parameters:

  • message (Object)

    the message received (decoded).

  • topic (String)

    the topic of the message.

Returns:

  • (Object)

    the message received (decoded).



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ezmq/subscribe.rb', line 37

def receive(**options)
  message = ''
  @socket.recv_string message

  message = message.match(/^(?<topic>[^\ ]*)\ (?<body>.*)/)

  decoded = (options[:decode] || @decode).call message['body']
  if block_given?
    yield decoded, message['topic']
  else
    [decoded, message['topic']]
  end
end

- (Boolean) subscribe(topic)

Note:

By default, a Subscriber filters all incoming messages. Without

Establishes a new message filter on the socket.

calling subscribe at least once, no messages will be accepted. If topic was provided, #initialize calls #subscribe automatically.

prefix will be accepted.

Parameters:

  • topic (String)

    a topic to subscribe to. Messages matching this

Returns:

  • (Boolean)

    was subscription successful?



81
82
83
# File 'lib/ezmq/subscribe.rb', line 81

def subscribe(topic)
  @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
end

- (Boolean) unsubscribe(topic)

Removes a message filter (as set with subscribe) from the socket.

Parameters:

  • topic (String)

    the topic to unsubscribe from. If multiple filters with the same topic are set, this will only remove one.

Returns:

  • (Boolean)

    was unsubscription successful?



92
93
94
# File 'lib/ezmq/subscribe.rb', line 92

def unsubscribe(topic)
  @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
end