FIX: DistributedMutex (#7953)

FIX: DistributedMutex (#7953)

diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb
index e173774..420c5bc 100644
--- a/lib/distributed_mutex.rb
+++ b/lib/distributed_mutex.rb
@@ -24,10 +24,43 @@ class DistributedMutex
 
   # NOTE wrapped in mutex to maintain its semantics
   def synchronize
-    @mutex.lock
+    @mutex.synchronize do
+      expire_time = get_lock
+
+      begin
+        yield
+      ensure
+        current_time = redis.time[0]
+        unless current_time < expire_time
+          warn("held for too long")
+        end
+
+        unless unlock(expire_time)
+          warn("didn't unlock cleanly")
+        end
+      end
+    end
+  end
+
+  private
+
+  attr_reader :key
+  attr_reader :redis
+  attr_reader :validity
+
+  def warn(msg)
+    Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
+  end
+
+  def get_lock
     attempts = 0
 
-    while !try_to_get_lock
+    while true
+      got_lock, expire_time = try_to_get_lock
+      if got_lock
+        return expire_time
+      end
+
       sleep 0.001
       # in readonly we will never be able to get a lock
       if @using_global_redis && Discourse.recently_readonly?
@@ -38,38 +71,48 @@ class DistributedMutex
         end
       end
     end
-
-    yield
-
-  ensure
-    @redis.del @key
-    @mutex.unlock
   end
 
-  private
-
   def try_to_get_lock
     got_lock = false
 
-    if @redis.setnx @key, Time.now.to_i + @validity
-      @redis.expire @key, @validity
-      got_lock = true
-    else
-      begin
-        @redis.watch @key
-        time = @redis.get @key
+    now = redis.time[0]
+    expire_time = now + validity
 
-        if time && time.to_i < Time.now.to_i
-          got_lock = @redis.multi do
-            @redis.set @key, Time.now.to_i + @validity
-          end
+    redis.watch key
+
+    current_expire_time = redis.get key
+
+    if current_expire_time.present? && current_expire_time.to_i > now
+      redis.unwatch
+
+      got_lock = false
+    else
+      result =
+        redis.multi do
+          redis.set key, expire_time.to_s
+          redis.expire key, validity
         end
-      ensure
-        @redis.unwatch
-      end
+
+      got_lock = result.present?
     end
 
-    got_lock
+    [got_lock, expire_time]
   end
 
+  def unlock(expire_time)
+    redis.watch key
+    current_expire_time = redis.get key
+
+    if current_expire_time == expire_time.to_s
+      result =
+        redis.multi do
+          redis.del key
+        end
+      return result.present?
+    else
+      redis.unwatch
+      return false
+    end
+  end
 end
diff --git a/lib/tasks/scheduler.rake b/lib/tasks/scheduler.rake
index 4b38e4d..6bfb28e 100644
--- a/lib/tasks/scheduler.rake
+++ b/lib/tasks/scheduler.rake
@@ -1,5 +1,7 @@
 # frozen_string_literal: true
 
+require 'benchmark'
+
 desc "This task is called by the Heroku scheduler add-on"
 
 task enqueue_digest_emails: :environment do
@@ -18,16 +20,11 @@ task version_check: :environment do
   Jobs::VersionCheck.new.execute(nil)
 end
 
-def time
-  start = Time.now
-  yield
-  puts "Elapsed #{((Time.now - start) * 1000).to_i}ms"
-end
-
 desc "run every task the scheduler knows about in that order, use only for debugging"
 task 'scheduler:run_all' => :environment do
   MiniScheduler::Manager.discover_schedules.each do |schedule|
     puts "Running #{schedule}"
-    time { schedule.new.execute({}) }
+    elapsed = Benchmark.realtime { schedule.new.execute({}) }
+    puts "Elapsed #{(elapsed * 1000).to_i}ms"
   end
 end
diff --git a/spec/components/distributed_mutex_spec.rb b/spec/components/distributed_mutex_spec.rb
index 569c35e..95588c1 100644
--- a/spec/components/distributed_mutex_spec.rb
+++ b/spec/components/distributed_mutex_spec.rb
@@ -98,4 +98,47 @@ describe DistributedMutex do
     end
   end
 
+  context "executions" do
+    it "should not allow critical sections to overlap" do
+      connections = (0...3).map { DiscourseRedis.new }
+
+      scenario =
+        Concurrency::Scenario.new do |execution|
+          locked = false
+
+          $redis.del('mutex_key')
+
+          connections.each do |connection|
+            connection.unwatch
+          end
+
+          3.times do |i|
+            execution.spawn do
+              begin
+                redis =
+                  Concurrency::RedisWrapper.new(
+                    connections[i],
+                    execution
+                  )
+
+                2.times do
+                  DistributedMutex.synchronize('mutex_key', redis: redis) do
+                    raise "already locked #{execution.path}" if locked
+                    locked = true
+
+                    execution.yield
+
+                    raise "already unlocked #{execution.path}" unless locked
+                    locked = false
+                  end
+                end
+              rescue Redis::ConnectionError
+              end
+            end
+          end
+        end
+
+      scenario.run(runs: 10)
+    end
+  end
 end
diff --git a/spec/support/concurrency.rb b/spec/support/concurrency.rb
new file mode 100644
index 0000000..fdc0f79
--- /dev/null
+++ b/spec/support/concurrency.rb
@@ -0,0 +1,253 @@
+# frozen_string_literal: true
+
+require 'fiber'
+
+module Concurrency
+  module Logic
+    class DeadEnd < StandardError; end
+
+    module Complete
+      class Path
+        def initialize
+          @path = []
+          @index = 0
+        end
+
+        def to_s
+          "#<Logic::Complete::Path path=#{@path}>"
+        end
+
+        def choose(*options)
+          raise DeadEnd if options.empty?
+
+          @path << [options.size, 0] unless @index < @path.size
+
+          pair = @path[@index]
+          raise "non-determinism" unless pair[0] == options.size
+
+          @index += 1
+          options[pair[1]]
+        end
+
+        def choose_with_weights(*options)
+          choose(options.map(&:first))
+        end
+
+        def dead_end
+          raise DeadEnd
+        end
+
+        def guard(condition)
+          dead_end unless condition
+        end
+
+        def next
+          @index = 0
+
+          until @path.empty?
+            pair = @path.last
+            pair[1] += 1
+            if pair[1] < pair[0]
+              break
+            else
+              @path.pop
+            end
+          end
+
+          !@path.empty?
+        end
+      end
+
+      def self.run(&blk)
+        path = Path.new
+        possibilities = []
+
+        while true
+          begin
+            possibilities << blk.call(path)
+          rescue DeadEnd
+          end
+
+          break unless path.next
+        end
+
+        possibilities
+      end
+    end
+
+    module Sampling
+      class Path
+        def initialize(random)
+          @random = random
+        end
+
+        def to_s
+          "#<Logic::Sampling::Path seed=#{@random.seed}>"
+        end
+
+        def choose(*options)
+          options.sample(random: @random)
+        end
+
+        def choose_with_weights(*options)
+          position = @random.rand
+          options.each do |(option, weight)|
+            if position <= weight
+              return option
+            else
+              position -= weight
+            end
+          end
+          raise "weights don't add up"
+        end
+
+        def dead_end
+          raise DeadEnd
+        end
+
+        def guard(condition)
+          dead_end unless condition
+        end
+      end
+
+      def self.run(seed, runs, &blk)
+        seed = seed.to_i
+        possibilities = []
+
+        runs.times do |i|
+          path = Path.new(Random.new(seed + i))
+
+          begin
+            possibilities << blk.call(path)
+          rescue DeadEnd
+          end
+        end
+
+        possibilities
+      end
+    end
+
+    def self.run(seed: nil, runs: nil, &blk)
+      if runs.present?
+        Sampling.run(seed, runs, &blk)
+      else
+        Complete.run(&blk)
+      end
+    end
+  end
+
+  class Scenario
+    def initialize(&blk)
+      @blk = blk
+    end
+

[... diff too long, it was truncated ...]

GitHub sha: 20bc4a38

2 Likes

Grrr! Github lost my commit messages. I selected “rebase and merge” and it said there was a problem. It seems to have squashed the commits and discarded their messages.

For posterity:

DEV: Removed top level time method from rake task

Top level methods pollute the method namespace. In this case it was causing problems as it conflicted with the redis time command.

DEV: Concurrent fuzz testing

When testing concurrent code, it is often useful to be able to sample from or iterate through all possible interleavings and eventualities in a deterministic manner.

These helpers allow you to do just that. Here’s an example:

scenario =
  Concurrency::Scenario.new do |execution|
    output = []

    execution.spawn do
      output << 1
      execution.yield
      output << 2
    end

    execution.spawn do
      output << 3
      execution.yield
      output << 4
    end

    output
  end

possibilities = scenario.run
# [
#   [1, 2, 3, 4],
#   [1, 3, 2, 4],
#   [1, 3, 4, 2],
#   [3, 1, 2, 4],
#   [3, 1, 4, 2],
#   [3, 4, 1, 2]
# ]

execution.spawn starts a new task. execution.yield hands off control to the scheduler to resume another (or the same) task. The scheduler is deterministic which means that if you find a problem execution, you can replay it and inspect it.

FIX: DistributedMutex

DistributedMutex would unlock when it shouldn’t, when:

  • The mutex was held for too long,
  • When there was an error getting the mutex,

DEV: DistributedMutex tests

2 Likes