Sha256: e2737dace7ab4aafab7583e35a1b46a496a675e1bc65aaafa3f03d2244d616c8
Contents?: true
Size: 1.81 KB
Versions: 3
Compression:
Stored size: 1.81 KB
Contents
# frozen_string_literal: true module Wayfarer module CLI class Job < Base desc "perform JOB URL", "Perform JOB with URL" option :mock_redis, type: :boolean option :batch, type: :string, default: SecureRandom.uuid def perform(job, url) load_environment mock_redis if options[:mock_redis] job = job.classify.constantize.new task = Wayfarer::Task.new(url, options[:batch]) job.arguments.push(task) job.perform(task) task.gc.run end desc "enqueue JOB URL", "Enqueue JOB with URL" option :batch, type: :string, default: SecureRandom.uuid def enqueue(job, url) load_environment mock_redis if options[:mock_redis] # TODO: Remove, does not belong here job = job.classify.constantize job.crawl(url, batch: options[:batch]) end desc "execute JOB URL", "Execute JOB with async adapter" option :mock_redis, type: :boolean option :batch, type: :string, default: SecureRandom.uuid option :min_threads, type: :numeric, default: 1 option :max_threads, type: :numeric, default: 1 def execute(job, url) load_environment mock_redis if options[:mock_redis] job = job.classify.constantize job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new(min_threads: options[:min_threads], max_threads: options[:max_threads]) scheduler = job.queue_adapter.instance_variable_get(:@scheduler) executor = scheduler.instance_variable_get(:@async_executor) job.crawl(url, batch: options[:batch]) sleep(1) while executor.scheduled_task_count > executor.completed_task_count free_agent_pool end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
wayfarer-0.4.6 | lib/wayfarer/cli/job.rb |
wayfarer-0.4.5 | lib/wayfarer/cli/job.rb |
wayfarer-0.4.4 | lib/wayfarer/cli/job.rb |