logger: # Optional log file, set to false or remove to disable it file: log/phobos.log # Optional output format for stdout, default is false (human readable). # Set to true to enable json output. stdout_json: false level: info # Comment the block to disable ruby-kafka logs ruby_kafka: level: error kafka: # identifier for this application client_id: phobos # timeout setting for connecting to brokers connect_timeout: # timeout setting for socket connections socket_timeout: # PEM encoded CA cert to use with an SSL connection (string) ssl_ca_cert: # PEM encoded client cert to use with an SSL connection (string) # Must be used in combination with ssl_client_cert_key ssl_client_cert: # PEM encoded client cert key to use with an SSL connection (string) # Must be used in combination with ssl_client_cert ssl_client_cert_key: # list of brokers used to initialize the client ("port:protocol") seed_brokers: - localhost:9092 producer: # number of seconds a broker can wait for replicas to acknowledge # a write before responding with a timeout ack_timeout: 5 # number of replicas that must acknowledge a write, or `:all` # if all in-sync replicas must acknowledge required_acks: :all # number of retries that should be attempted before giving up sending # messages to the cluster. Does not include the original attempt max_retries: 2 # number of seconds to wait between retries retry_backoff: 1 # number of messages allowed in the buffer before new writes will # raise {BufferOverflow} exceptions max_buffer_size: 1000 # maximum size of the buffer in bytes. Attempting to produce messages # when the buffer reaches this size will result in {BufferOverflow} being raised max_buffer_bytesize: 10000000 # name of the compression codec to use, or nil if no compression should be performed. # Valid codecs: `:snappy` and `:gzip` compression_codec: # number of messages that needs to be in a message set before it should be compressed. # Note that message sets are per-partition rather than per-topic or per-producer compression_threshold: 1 # maximum number of messages allowed in the queue. Only used for async_producer max_queue_size: 1000 # if greater than zero, the number of buffered messages that will automatically # trigger a delivery. Only used for async_producer delivery_threshold: 0 # if greater than zero, the number of seconds between automatic message # deliveries. Only used for async_producer delivery_interval: 0 # Set this to true to keep the producer connection between publish calls. # This can speed up subsequent messages by around 30%, but it does mean # that you need to manually call sync_producer_shutdown before exiting, # similar to async_producer_shutdown. persistent_connections: false consumer: # number of seconds after which, if a client hasn't contacted the Kafka cluster, # it will be kicked out of the group session_timeout: 30 # interval between offset commits, in seconds offset_commit_interval: 10 # number of messages that can be processed before their offsets are committed. # If zero, offset commits are not triggered by message processing offset_commit_threshold: 0 # the time period that committed offsets will be retained, in seconds. Defaults to the broker setting. offset_retention_time: # interval between heartbeats; must be less than the session window heartbeat_interval: 10 backoff: min_ms: 1000 max_ms: 60000 listeners: - handler: Phobos::EchoHandler topic: test # id of the group that the consumer should join group_id: test-1 # Number of threads created for this listener, each thread will behave as an independent consumer. # They don't share any state max_concurrency: 1 # Once the consumer group has checkpointed its progress in the topic's partitions, # the consumers will always start from the checkpointed offsets, regardless of config # As such, this setting only applies when the consumer initially starts consuming from a topic start_from_beginning: true # maximum amount of data fetched from a single partition at a time max_bytes_per_partition: 524288 # 512 KB # Minimum number of bytes to read before returning messages from the server; if `max_wait_time` is reached, this is ignored. min_bytes: 1 # Maximum duration of time to wait before returning messages from the server, in seconds max_wait_time: 5 # Apply this encoding to the message payload, if blank it uses the original encoding. This property accepts values # defined by the ruby Encoding class (https://ruby-doc.org/core-2.3.0/Encoding.html). Ex: UTF_8, ASCII_8BIT, etc force_encoding: # Specify the delivery method for a listener. # Possible values: [`message`, `batch` (default)] # - `message` will yield individual messages from Ruby Kafka using `each_message` and will commit/heartbeat at every consumed message. # This is overall a bit slower than using batch, but easier to configure. # - `batch` will yield batches from Ruby Kafka using `each_batch`, and commit/heartbeat at every consumed batch. # Due to implementation in Ruby Kafka, it should be noted that when configuring large batch sizes in combination with taking long time to consume messages, # your consumers might get kicked from the Kafka cluster for not sending a heartbeat within the expected interval. # Take this into consideration when determining your configuration. You can send heartbeats manually while # processing your messages if necessary. # - `inline_batch` also uses `each_batch`, but will pass the entire batch to your handler instead # of one message at a time. To use this method, you should include Phobos::BatchHandler # instead of Phobos::Handler so that you can make use of the `consume_batch` etc. methods. # Note: Ultimately commit/heartbeart will depend on the offset commit options and the heartbeat interval. delivery: batch # Use this if custom backoff is required for a listener backoff: min_ms: 500 max_ms: 10000 # session_timeout, offset_commit_interval, offset_commit_threshold, offset_retention_time, and heartbeat_interval # can be customized per listener if desired session_timeout: 30 offset_commit_interval: 15 offset_commit_threshold: 5 offset_retention_time: 172800 heartbeat_interval: 20