FEATURE: add support for multiple workers

FEATURE: add support for multiple workers

Previously mini scheduler was tied to a single runner. This was somewhat arbitrary as all the internals were designed with multi runner support.

This unlocks the ability to run multiple threads for the jobs.

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f169b0..5da0e8a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+# 0.12.0 - 29-08-2019
+
+- Add support for multiple workers which allows avoiding queue starvation
+
 # 0.11.0 - 24-06-2019
 
 - Correct situation where distributed mutex could end in a tight loop when
diff --git a/README.md b/README.md
index b9c2c82..e1a7002 100644
--- a/README.md
+++ b/README.md
@@ -30,6 +30,20 @@ In a Rails application, create files needed in your application to configure min
 
 An initializer is created named `config/initializers/mini_scheduler.rb` which lists all the configuration options.
 
+## Configuring MiniScheduler
+
+By default each instance of MiniScheduler will run with a single worker. To amend this behavior:
+
+`‍``
+if Sidekiq.server? && defined?(Rails)
+  Rails.application.config.after_initialize do
+    MiniScheduler.start(workers: 5)
+  end
+end
+`‍``
+
+This is useful for cases where you have extremely long running tasks that you would prefer did not starve.
+
 ## Usage
 
 Create jobs with a recurring schedule like this:
diff --git a/lib/mini_scheduler.rb b/lib/mini_scheduler.rb
index 8eb5228..7822fff 100644
--- a/lib/mini_scheduler.rb
+++ b/lib/mini_scheduler.rb
@@ -52,11 +52,11 @@ module MiniScheduler
     @skip_schedule
   end
 
-  def self.start
+  def self.start(workers: 1)
     schedules = Manager.discover_schedules
 
     Manager.discover_queues.each do |queue|
-      manager = Manager.new(queue: queue)
+      manager = Manager.new(queue: queue, workers: workers)
 
       schedules.each do |schedule|
         if schedule.queue == queue
diff --git a/lib/mini_scheduler/manager.rb b/lib/mini_scheduler/manager.rb
index b016ecd..3caffbf 100644
--- a/lib/mini_scheduler/manager.rb
+++ b/lib/mini_scheduler/manager.rb
@@ -2,7 +2,7 @@
 
 module MiniScheduler
   class Manager
-    attr_accessor :random_ratio, :redis, :enable_stats, :queue
+    attr_accessor :random_ratio, :redis, :enable_stats, :queue, :workers
 
     class Runner
       def initialize(manager)
@@ -29,9 +29,12 @@ module MiniScheduler
             sleep (@manager.keep_alive_duration / 2)
           end
         end
-        @thread = Thread.new do
-          while !@stopped
-            process_queue
+        @threads = []
+        manager.workers.times do
+          @threads << Thread.new do
+            while !@stopped
+              process_queue
+            end
           end
         end
       end
@@ -128,10 +131,10 @@ module MiniScheduler
 
           kill_thread = Thread.new do
             sleep 0.5
-            @thread.kill
+            @threads.each(&:kill)
           end
 
-          @thread.join
+          @threads.each(&:join)
           kill_thread.kill
           kill_thread.join
         end
@@ -171,6 +174,7 @@ module MiniScheduler
 
     def initialize(options = nil)
       @queue = options && options[:queue] || "default"
+      @workers = options && options[:workers] || 1
       @redis = MiniScheduler.redis
       @random_ratio = 0.1
       unless options && options[:skip_runner]
diff --git a/lib/mini_scheduler/version.rb b/lib/mini_scheduler/version.rb
index 032fc13..ebc3f7a 100644
--- a/lib/mini_scheduler/version.rb
+++ b/lib/mini_scheduler/version.rb
@@ -1,5 +1,5 @@
 # frozen_string_literal: true
 
 module MiniScheduler
-  VERSION = "0.11.0"
+  VERSION = "0.12.0"
 end
diff --git a/spec/mini_scheduler/slow_spec.rb b/spec/mini_scheduler/slow_spec.rb
new file mode 100644
index 0000000..84a3899
--- /dev/null
+++ b/spec/mini_scheduler/slow_spec.rb
@@ -0,0 +1,75 @@
+# frozen_string_literal: true
+# encoding: utf-8
+
+if ENV["SLOW"]
+
+  class FastJob
+    extend ::MiniScheduler::Schedule
+    every 1.second
+
+    def self.runs=(val)
+      @runs = val
+    end
+
+    def self.runs
+      @runs ||= 0
+    end
+
+    def perform
+      sleep 0.001
+      self.class.runs += 1
+    end
+  end
+
+  class SlowJob
+    extend ::MiniScheduler::Schedule
+    every 5.second
+
+    def self.runs=(val)
+      @runs = val
+    end
+
+    def self.runs
+      @runs ||= 0
+    end
+
+    def perform
+      sleep 5
+      self.class.runs += 1
+    end
+  end
+
+  describe MiniScheduler::Manager do
+
+    let(:redis) { MockRedis.new }
+
+    it "can correctly operate with multiple workers" do
+      MiniScheduler.configure do |config|
+        config.redis = redis
+      end
+
+      manager = MiniScheduler::Manager.new(enable_stats: false, workers: 2)
+
+      sched = manager.schedule_info(FastJob)
+      # we jitter start times, this bypasses it
+      sched.next_run = Time.now + 0.1
+      sched.schedule!
+
+      sched = manager.schedule_info(SlowJob)
+      # we jitter start times, this bypasses it
+      sched.next_run = Time.now + 0.1
+      sched.schedule!
+
+      10.times do
+        manager.tick
+        sleep 1
+      end
+
+      manager.stop!
+
+      expect(FastJob.runs).to be > 5
+      expect(SlowJob.runs).to be > 0
+
+    end
+  end
+end

GitHub sha: 7cf0f99d

1 Like