Correct flaky distributed cache test make distributed cache more testable

Correct flaky distributed cache test make distributed cache more testable

diff --git a/lib/distributed_cache.rb b/lib/distributed_cache.rb
index 2b08e64..b88e467 100644
--- a/lib/distributed_cache.rb
+++ b/lib/distributed_cache.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
 # Like a hash, just does its best to stay in sync across the farm
 # On boot all instances are blank, but they populate as various processes
 # fill it up
@@ -6,96 +8,110 @@ require 'weakref'
 require 'base64'
 
 class DistributedCache
-  @subscribers = []
-  @subscribed = false
-  @lock = Mutex.new
 
-  attr_reader :key
+  class Manager
 
-  def self.subscribers
-    @subscribers
-  end
+    def initialize(message_bus = nil)
+      @subscribers = []
+      @subscribed = false
+      @lock = Mutex.new
+      @message_bus = message_bus || MessageBus
+    end
 
-  def self.process_message(message)
-    i = @subscribers.length - 1
+    def subscribers
+      @subscribers
+    end
 
-    payload = message.data
+    def process_message(message)
+      i = @subscribers.length - 1
 
-    while i >= 0
-      begin
-        current = @subscribers[i]
+      payload = message.data
 
-        next if payload["origin"] == current.identity
-        next if current.key != payload["hash_key"]
-        next if payload["discourse_version"] != Discourse.git_version
+      while i >= 0
+        begin
+          current = @subscribers[i]
 
-        hash = current.hash(message.site_id)
+          next if payload["origin"] == current.identity
+          next if current.key != payload["hash_key"]
+          next if payload["discourse_version"] != Discourse.git_version
 
-        case payload["op"]
-        when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
-        when "delete" then hash.delete(payload["key"])
-        when "clear"  then hash.clear
-        end
+          hash = current.hash(message.site_id)
+
+          case payload["op"]
+          when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
+          when "delete" then hash.delete(payload["key"])
+          when "clear"  then hash.clear
+          end
 
-      rescue WeakRef::RefError
-        @subscribers.delete_at(i)
-      ensure
-        i -= 1
+        rescue WeakRef::RefError
+          @subscribers.delete_at(i)
+        ensure
+          i -= 1
+        end
       end
     end
-  end
 
-  def self.channel_name
-    "/distributed_hash".freeze
-  end
+    def channel_name
+      "/distributed_hash".freeze
+    end
 
-  def self.ensure_subscribe!
-    return if @subscribed
-    @lock.synchronize do
+    def ensure_subscribe!
       return if @subscribed
-      MessageBus.subscribe(channel_name) do |message|
-        @lock.synchronize do
-          process_message(message)
+      @lock.synchronize do
+        return if @subscribed
+        @message_bus.subscribe(channel_name) do |message|
+          @lock.synchronize do
+            process_message(message)
+          end
         end
+        @subscribed = true
       end
-      @subscribed = true
     end
-  end
 
-  def self.publish(hash, message)
-    message[:origin] = hash.identity
-    message[:hash_key] = hash.key
-    message[:discourse_version] = Discourse.git_version
-    MessageBus.publish(channel_name, message, user_ids: [-1])
-  end
+    def publish(hash, message)
+      message[:origin] = hash.identity
+      message[:hash_key] = hash.key
+      message[:discourse_version] = Discourse.git_version
+      @message_bus.publish(channel_name, message, user_ids: [-1])
+    end
 
-  def self.set(hash, key, value)
-    # special support for set
-    marshal = (Set === value || Hash === value)
-    value = Base64.encode64(Marshal.dump(value)) if marshal
-    publish(hash, op: :set, key: key, value: value, marshalled: marshal)
-  end
+    def set(hash, key, value)
+      # special support for set
+      marshal = (Set === value || Hash === value)
+      value = Base64.encode64(Marshal.dump(value)) if marshal
+      publish(hash, op: :set, key: key, value: value, marshalled: marshal)
+    end
 
-  def self.delete(hash, key)
-    publish(hash, op: :delete, key: key)
-  end
+    def delete(hash, key)
+      publish(hash, op: :delete, key: key)
+    end
 
-  def self.clear(hash)
-    publish(hash, op: :clear)
-  end
+    def clear(hash)
+      publish(hash, op: :clear)
+    end
 
-  def self.register(hash)
-    @lock.synchronize do
-      @subscribers << WeakRef.new(hash)
+    def register(hash)
+      @lock.synchronize do
+        @subscribers << WeakRef.new(hash)
+      end
     end
   end
 
-  def initialize(key)
-    DistributedCache.ensure_subscribe!
-    DistributedCache.register(self)
+  @default_manager = Manager.new
+
+  def self.default_manager
+    @default_manager
+  end
+
+  attr_reader :key
 
+  def initialize(key, manager = nil)
     @key = key
     @data = {}
+    @manager = manager || DistributedCache.default_manager
+
+    @manager.ensure_subscribe!
+    @manager.register(self)
   end
 
   def identity
@@ -105,7 +121,7 @@ class DistributedCache
 
   def []=(k, v)
     k = k.to_s if Symbol === k
-    DistributedCache.set(self, k, v)
+    @manager.set(self, k, v)
     hash[k] = v
   end
 
@@ -116,12 +132,12 @@ class DistributedCache
 
   def delete(k)
     k = k.to_s if Symbol === k
-    DistributedCache.delete(self, k)
+    @manager.delete(self, k)
     hash.delete(k)
   end
 
   def clear
-    DistributedCache.clear(self)
+    @manager.clear(self)
     hash.clear
   end
 
diff --git a/spec/components/distributed_cache_spec.rb b/spec/components/distributed_cache_spec.rb
index c0b5c29..162e553 100644
--- a/spec/components/distributed_cache_spec.rb
+++ b/spec/components/distributed_cache_spec.rb
@@ -4,20 +4,30 @@ require 'distributed_cache'
 describe DistributedCache do
 
   before :all do
-    $redis.flushall
+    @bus = MessageBus::Instance.new
+    @bus.configure(backend: :memory)
+    @manager = DistributedCache::Manager.new(@bus)
+  end
+
+  after :all do
+    @bus.destroy
+  end
+
+  def cache(name)
+    DistributedCache.new(name, @manager)
   end
 
   let! :cache1 do
-    DistributedCache.new("test")
+    cache("test")
   end
 
   let! :cache2 do
-    DistributedCache.new("test")
+    cache("test")
   end
 
   it 'allows us to store Set' do
-    c1 = DistributedCache.new("test1")
-    c2 = DistributedCache.new("test1")
+    c1 = cache("test1")
+    c2 = cache("test1")
 
     set = Set.new
     set << 1
@@ -45,8 +55,8 @@ describe DistributedCache do
   end
 
   it 'does not leak state across caches' do
-    c2 = DistributedCache.new("test1")
-    c3 = DistributedCache.new("test1")
+    c2 = cache("test1")
+    c3 = cache("test1")
     c2["hi"] = "hi"
     wait_for do
       c3["hi"] == "hi"

GitHub sha: 71ad3a48