Sha256: ab537f1361c6241b6a10b4ce361731b656981885ffa93d7a2fb557dd2ee9f0ce

Contents?: true

Size: 1.96 KB

Versions: 15

Compression:

Stored size: 1.96 KB

Contents

require 'concurrent/ivar'
require 'concurrent/utility/timer'
require 'concurrent/executor/safe_task_executor'

module Concurrent

  class ScheduledTask < IVar

    attr_reader :schedule_time

    def initialize(intended_time, opts = {}, &block)
      raise ArgumentError.new('no block given') unless block_given?
      TimerSet.calculate_schedule_time(intended_time) # raises exceptons

      super(NO_VALUE, opts)

      self.observers = CopyOnNotifyObserverSet.new
      @intended_time = intended_time
      @state         = :unscheduled
      @task          = block
      @executor      = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
    end

    # @since 0.5.0
    def execute
      if compare_and_set_state(:pending, :unscheduled)
        @schedule_time = TimerSet.calculate_schedule_time(@intended_time)
        Concurrent::timer(@schedule_time.to_f - Time.now.to_f) { @executor.post &method(:process_task) }
        self
      end
    end

    # @since 0.5.0
    def self.execute(intended_time, opts = {}, &block)
      return ScheduledTask.new(intended_time, opts, &block).execute
    end

    def cancelled?
      state == :cancelled
    end

    def in_progress?
      state == :in_progress
    end

    def cancel
      if_state(:unscheduled, :pending) do
        @state = :cancelled
        event.set
        true
      end
    end
    alias_method :stop, :cancel

    def add_observer(*args, &block)
      if_state(:unscheduled, :pending, :in_progress) do
        observers.add_observer(*args, &block)
      end
    end

    protected :set, :fail, :complete

    private

    def process_task
      if compare_and_set_state(:in_progress, :pending)
        success, val, reason = SafeTaskExecutor.new(@task).execute

        mutex.synchronize do
          set_state(success, val, reason)
          event.set
        end

        time = Time.now
        observers.notify_and_delete_observers { [time, self.value, reason] }
      end
    end
  end
end

Version data entries

15 entries across 15 versions & 1 rubygems

Version Path
concurrent-ruby-0.7.0 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-x86_64-linux lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-x86-solaris-2.11 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-x86-mingw32 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-x86-linux lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-x64-mingw32 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0-java lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x86_64-linux lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x86_64-darwin-13 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x86-solaris-2.11 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x86-mingw32 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x86-linux lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-x64-mingw32 lib/concurrent/scheduled_task.rb
concurrent-ruby-0.7.0.rc2-java lib/concurrent/scheduled_task.rb