# #-- # Copyright (c) 2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'yaml' require 'base64' require 'monitor' require 'openwfe/service' #require 'openwfe/rudefinitions' require 'openwfe/util/scheduler' require 'openwfe/listeners/listener' require 'openwfe/util/sqs' # # some base listener implementations # module OpenWFE # # Polls an Amazon SQS queue for workitems # # Workitems can be instances of InFlowWorkItem or LaunchItem. # # require 'openwfe/listeners/sqslisteners' # # ql = OpenWFE::SqsListener("workqueue1", engine.application_context) # # engine.add_workitem_listener(ql, "2m30s") # # # # thus, the engine will poll our "workqueue1" SQS queue # # every 2 minutes and 30 seconds # class SqsListener < Service include MonitorMixin, WorkItemListener, Schedulable attr_reader :queue_name def initialize (queue_name, application_context) @queue_name = queue_name.to_s service_name = "#{self.class}::#{@queue_name}" super(service_name, application_context) linfo { "new() queue is '#{@queue_name}'" } end # # Will 'find' files in the work directory (by default ./work/in/), # extract the workitem in them and feed it back to the engine. # def trigger (params) synchronize do ldebug { "trigger()" } qs = SQS::QueueService.new qs.create_queue(@queue_name) # just to be sure it is there while true l = qs.get_messages( @queue_name, :timeout => 0, :count => 255) break if l.length < 1 l.each do |msg| o = decode_object(msg) handle_object(o) msg.delete ldebug do "trigger() " + "handled successfully msg #{msg.message_id}" end end end end end # # Extracts a workitem from the message's body. # # By default, this listeners assumes the workitem is stored in # its "hash form" (not directly as a Ruby InFlowWorkItem instance). # # LaunchItem instances (as hash as well) are also accepted. # def decode_object (message) o = Base64.decode64(message.message_body) o = YAML.load(o) o = OpenWFE::workitem_from_h(o) o end end end