FIX: Scheduler failed to run any jobs after Redis flush (#5)

FIX: Scheduler failed to run any jobs after Redis flush (#5)

This adds a check that runs every 60 seconds. It schedules all available jobs for a queue if the keys of the per host queue and the global queue are missing from Redis.

diff --git a/lib/mini_scheduler/manager.rb b/lib/mini_scheduler/manager.rb
index 6a05728..ca15af0 100644
--- a/lib/mini_scheduler/manager.rb
+++ b/lib/mini_scheduler/manager.rb
@@ -12,11 +12,12 @@ module MiniScheduler
         @manager = manager
         @hostname = manager.hostname
 
-        @reschedule_orphans_thread = Thread.new do
+        @recovery_thread = Thread.new do
           while !@stopped
             sleep 60
 
             @mutex.synchronize do
+              repair_queue
               reschedule_orphans
             end
           end
@@ -45,6 +46,12 @@ module MiniScheduler
         MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive")
       end
 
+      def repair_queue
+        @manager.repair_queue
+      rescue => ex
+        MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair")
+      end
+
       def reschedule_orphans
         @manager.reschedule_orphans!
       rescue => ex
@@ -122,10 +129,10 @@ module MiniScheduler
           @stopped = true
 
           @keep_alive_thread.kill
-          @reschedule_orphans_thread.kill
+          @recovery_thread.kill
 
           @keep_alive_thread.join
-          @reschedule_orphans_thread.join
+          @recovery_thread.join
 
           enq(nil)
 
@@ -252,6 +259,15 @@ module MiniScheduler
       nil
     end
 
+    def repair_queue
+      return if redis.exists?(self.class.queue_key(queue)) ||
+        redis.exists?(self.class.queue_key(queue, hostname))
+
+      self.class.discover_schedules
+        .select { |schedule| schedule.queue == queue }
+        .each { |schedule| ensure_schedule!(schedule) }
+    end
+
     def tick
       lock do
         schedule_next_job
diff --git a/spec/mini_scheduler/manager_spec.rb b/spec/mini_scheduler/manager_spec.rb
index 60b932e..39037d9 100644
--- a/spec/mini_scheduler/manager_spec.rb
+++ b/spec/mini_scheduler/manager_spec.rb
@@ -195,6 +195,35 @@ describe MiniScheduler::Manager do
       manager.stop!
     end
 
+    def queued_jobs(manager, with_hostname:)
+      hostname = with_hostname ? manager.hostname : nil
+      key = MiniScheduler::Manager.queue_key(manager.queue, hostname)
+      redis.zrange(key, 0, -1).map(&:constantize)
+    end
+
+    it 'should recover from Redis flush' do
+      manager = MiniScheduler::Manager.new(enable_stats: false)
+      manager.ensure_schedule!(Testing::SuperLongJob)
+      manager.ensure_schedule!(Testing::PerHostJob)
+
+      expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob)
+      expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob)
+
+      redis.scan_each(match: "_scheduler_*") do |key|
+        redis.del(key)
+      end
+
+      expect(queued_jobs(manager, with_hostname: false)).to be_empty
+      expect(queued_jobs(manager, with_hostname: true)).to be_empty
+
+      manager.repair_queue
+
+      expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob)
+      expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob)
+
+      manager.stop!
+    end
+
     it 'should only run pending job once' do
 
       Testing::RandomJob.runs = 0

GitHub sha: e358bf63

This commit appears in #5 which was approved by eviltrout and SamSaffron. It was merged by gschlager.