FEATURE: Add support for multiple schedule queues.

FEATURE: Add support for multiple schedule queues.

diff --git a/README.md b/README.md
index 77a6dcb..b9c2c82 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,7 @@ end
 
 Options for schedules:
 
+* **queue** followed by a queue name, like "queue :email", default queue is "default"
 * **every** followed by a duration in seconds, like "every 1.hour".
 * **daily at:** followed by a duration since midnight, like "daily at: 12.hours", to run only once per day at a specific time.
 
diff --git a/lib/mini_scheduler.rb b/lib/mini_scheduler.rb
index 7e9005e..bbb124b 100644
--- a/lib/mini_scheduler.rb
+++ b/lib/mini_scheduler.rb
@@ -52,24 +52,30 @@ module MiniScheduler
   end
 
   def self.start
-    manager = Manager.new
+    schedules = Manager.discover_schedules
 
-    Manager.discover_schedules.each do |schedule|
-      manager.ensure_schedule!(schedule)
-    end
+    Manager.discover_queues.each do |queue|
+      manager = Manager.new(queue: queue)
 
-    Thread.new do
-      while true
-        begin
-          if !self.skip_schedule || !self.skip_schedule.call
-            manager.tick
-          end
-        rescue => e
-          # the show must go on
-          handle_job_exception(e, message: "While ticking scheduling manager")
+      schedules.each do |schedule|
+        if schedule.queue == queue
+          manager.ensure_schedule!(schedule)
         end
+      end
 
-        sleep 1
+      Thread.new do
+        while true
+          begin
+            if !self.skip_schedule || !self.skip_schedule.call
+              manager.tick
+            end
+          rescue => e
+            # the show must go on
+            handle_job_exception(e, message: "While ticking scheduling manager")
+          end
+
+          sleep 1
+        end
       end
     end
   end
diff --git a/lib/mini_scheduler/manager.rb b/lib/mini_scheduler/manager.rb
index e496c17..db173f9 100644
--- a/lib/mini_scheduler/manager.rb
+++ b/lib/mini_scheduler/manager.rb
@@ -1,6 +1,6 @@
 module MiniScheduler
   class Manager
-    attr_accessor :random_ratio, :redis, :enable_stats
+    attr_accessor :random_ratio, :redis, :enable_stats, :queue
 
     class Runner
       def initialize(manager)
@@ -168,11 +168,13 @@ module MiniScheduler
     end
 
     def initialize(options = nil)
+      @queue = options && options[:queue] || "default"
+
       @redis = MiniScheduler.redis
       @random_ratio = 0.1
       unless options && options[:skip_runner]
         @runner = Runner.new(self)
-        self.class.current = self
+        self.class.current[@queue] = self
       end
 
       @hostname = options && options[:hostname]
@@ -186,11 +188,7 @@ module MiniScheduler
     end
 
     def self.current
-      @current
-    end
-
-    def self.current=(manager)
-      @current = manager
+      @current ||= {}
     end
 
     def hostname
@@ -225,7 +223,7 @@ module MiniScheduler
     end
 
     def reschedule_orphans_on!(hostname = nil)
-      redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
+      redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
         klass = get_klass(key)
         next unless klass
         info = schedule_info(klass)
@@ -253,14 +251,14 @@ module MiniScheduler
     end
 
     def schedule_next_job(hostname = nil)
-      (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
+      (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
       return unless key
 
       if due.to_i <= Time.now.to_i
         klass = get_klass(key)
         unless klass
           # corrupt key, nuke it (renamed job or something)
-          redis.zrem Manager.queue_key(hostname), key
+          redis.zrem Manager.queue_key(queue, hostname), key
           return
         end
         info = schedule_info(klass)
@@ -281,7 +279,7 @@ module MiniScheduler
 
     def stop!
       @runner.stop!
-      self.class.current = nil
+      self.class.current.delete(@queue)
     end
 
     def keep_alive_duration
@@ -293,11 +291,15 @@ module MiniScheduler
     end
 
     def lock
-      MiniScheduler::DistributedMutex.synchronize(Manager.lock_key, MiniScheduler.redis) do
+      MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
         yield
       end
     end
 
+    def self.discover_queues
+      ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
+    end
+
     def self.discover_schedules
       # hack for developemnt reloader is crazytown
       # multiple classes with same name can be in
@@ -326,15 +328,15 @@ module MiniScheduler
       @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
     end
 
-    def self.lock_key
-      "_scheduler_lock_"
+    def self.lock_key(queue)
+      "_scheduler_lock_#{queue}_"
     end
 
-    def self.queue_key(hostname = nil)
+    def self.queue_key(queue, hostname = nil)
       if hostname
-        "_scheduler_queue_#{hostname}_"
+        "_scheduler_queue_#{queue}_#{hostname}_"
       else
-        "_scheduler_queue_"
+        "_scheduler_queue_#{queue}_"
       end
     end
 
diff --git a/lib/mini_scheduler/schedule.rb b/lib/mini_scheduler/schedule.rb
index 2d04784..87a25a6 100644
--- a/lib/mini_scheduler/schedule.rb
+++ b/lib/mini_scheduler/schedule.rb
@@ -1,5 +1,10 @@
 module MiniScheduler::Schedule
 
+  def queue(value = nil)
+    @queue = value.to_s if value
+    @queue ||= "default"
+  end
+
   def daily(options = nil)
     if options
       @daily = options
@@ -10,7 +15,7 @@ module MiniScheduler::Schedule
   def every(duration = nil)
     if duration
       @every = duration
-      if manager = MiniScheduler::Manager.current
+      if manager = MiniScheduler::Manager.current[queue]
         manager.ensure_schedule!(self)
       end
     end
diff --git a/lib/mini_scheduler/schedule_info.rb b/lib/mini_scheduler/schedule_info.rb
index 7d651d9..c72552e 100644
--- a/lib/mini_scheduler/schedule_info.rb
+++ b/lib/mini_scheduler/schedule_info.rb
@@ -120,9 +120,9 @@ module MiniScheduler
 
     def queue_key
       if @klass.is_per_host
-        Manager.queue_key(@manager.hostname)
+        Manager.queue_key(@manager.queue, @manager.hostname)
       else
-        Manager.queue_key
+        Manager.queue_key(@manager.queue)
       end
     end
 
diff --git a/lib/mini_scheduler/views/scheduler.erb b/lib/mini_scheduler/views/scheduler.erb
index 1b9a0b6..018f95d 100644
--- a/lib/mini_scheduler/views/scheduler.erb
+++ b/lib/mini_scheduler/views/scheduler.erb
@@ -7,16 +7,17 @@
 <div class="container">
   <div class="row">
 
-    <div class="col-md-9">
+    <div class="col-md-12">
       <% if @schedules.length > 0 %>
       <table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
         <thead>
           <th style="width: 30%">Worker</th>
-          <th style="width: 15%">Last Run</th>
-          <th style="width: 15%">Last Result</th>
-          <th style="width: 15%">Last Duration</th>
-          <th style="width: 15%">Last Owner</th>
-          <th style="width: 15%">Next Run Due</th>
+          <th style="width: 10%">Last Run</th>
+          <th style="width: 10%">Last Result</th>
+          <th style="width: 10%">Last Duration</th>
+          <th style="width: 10%">Last Owner</th>
+          <th style="width: 10%">Next Run Due</th>
+          <th style="width: 10%">Queue</th>
           <th style="width: 10%">Actions</th>
         </thead>
         <% @schedules.each do |schedule| %>
@@ -50,6 +51,9 @@
               <% end %>
             </td>
             <td>
+              <%= schedule.queue %>
+            </td>
+            <td>
               <form action="<%= "#{root_path}scheduler/#{schedule}/trigger" %>" method="post">
                 <%= csrf_tag if respond_to?(:csrf_tag) %>
                 <input class="btn btn-danger btn-small" type="submit" name="trigger" value="Trigger" data-confirm="Are you sure you want to trigger this job?" />

[... diff too long, it was truncated ...]

GitHub sha: 12d1a82c

1 Like