DEV: Improve thread-safety of sidekiq logging

DEV: Improve thread-safety of sidekiq logging

diff --git a/app/jobs/base.rb b/app/jobs/base.rb
index 643bbb8..f210308 100644
--- a/app/jobs/base.rb
+++ b/app/jobs/base.rb
@@ -19,50 +19,53 @@ module Jobs
     class JobInstrumenter
       def initialize(job_class:, opts:, db:, jid:)
         return unless enabled?
-        @data = {}
-
-        @data["hostname"] = `hostname`.strip # Hostname
-        @data["pid"] = Process.pid # Pid
-        @data["database"] = db # DB name - multisite db name it ran on
-        @data["job_id"] = jid # Job unique ID
-        @data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
-        @data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
-        @data["opts"] = opts.to_json # Params - json encoded params for the job
-
-        @data["status"] = 'pending'
-        @start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
-
-        self.class.ensure_interval_logging!
-        @@active_jobs ||= []
-        @@active_jobs << self
-
-        MethodProfiler.ensure_discourse_instrumentation!
-        MethodProfiler.start
+        self.class.mutex.synchronize do
+          @data = {}
+
+          @data["hostname"] = `hostname`.strip # Hostname
+          @data["pid"] = Process.pid # Pid
+          @data["database"] = db # DB name - multisite db name it ran on
+          @data["job_id"] = jid # Job unique ID
+          @data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
+          @data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
+          @data["opts"] = opts.to_json # Params - json encoded params for the job
+
+          @data["status"] = 'pending'
+          @start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+
+          self.class.ensure_interval_logging!
+          @@active_jobs ||= []
+          @@active_jobs << self
+
+          MethodProfiler.ensure_discourse_instrumentation!
+          MethodProfiler.start
+        end
       end
 
       def stop(exception:)
         return unless enabled?
+        self.class.mutex.synchronize do
+          profile = MethodProfiler.stop
+
+          @@active_jobs.delete(self)
+
+          @data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
+          @data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
+          @data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
+          @data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
+          @data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
+          @data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
+          @data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
+
+          if exception.present?
+            @data["exception"] = exception # Exception - if job fails a json encoded exception
+            @data["status"] = 'failed'
+          else
+            @data["status"] = 'success' # Status - fail, success, pending
+          end
 
-        profile = MethodProfiler.stop
-
-        @@active_jobs.delete(self)
-
-        @data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
-        @data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
-        @data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
-        @data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
-        @data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
-        @data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
-        @data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
-
-        if exception.present?
-          @data["exception"] = exception # Exception - if job fails a json encoded exception
-          @data["status"] = 'failed'
-        else
-          @data["status"] = 'success' # Status - fail, success, pending
+          write_to_log
         end
-
-        write_to_log
       end
 
       def self.raw_log(message)
@@ -97,14 +100,21 @@ module Jobs
         ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
       end
 
+      def self.mutex
+        @@mutex ||= Mutex.new
+      end
+
       def self.ensure_interval_logging!
         interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
         return if !interval
+        interval = interval.to_i
         @@interval_thread ||= Thread.new do
           begin
             loop do
-              sleep interval.to_i
-              @@active_jobs.each { |j| j.write_to_log if j.current_duration > interval.to_i }
+              sleep interval
+              mutex.synchronize do
+                @@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
+              end
             end
           rescue Exception => e
             Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")

GitHub sha: e2510d79

1 Like