FIX: stop leaking redis keys used for tracking per channel id

FIX: stop leaking redis keys used for tracking per channel id

From 53e6d747826e3efdeb09395564556e7e2dc30931 Mon Sep 17 00:00:00 2001
From: Sam <sam.saffron@gmail.com>
Date: Fri, 30 Nov 2018 14:13:23 +1100
Subject: [PATCH] FIX: stop leaking redis keys used for tracking per channel id


diff --git a/.rubocop.yml b/.rubocop.yml
index 603d39b..a0c6e56 100644
--- a/.rubocop.yml
+++ b/.rubocop.yml
@@ -15,6 +15,10 @@ Layout/MultilineMethodCallIndentation:
 Bundler/OrderedGems:
   Enabled: false
 
+# Redis backend is quite big
+Metrics/ClassLength:
+  Max: 310
+
 Metrics/BlockLength:
   Exclude:
     - '**/spec_helper.rb'
diff --git a/CHANGELOG b/CHANGELOG
index a25fb0d..fb55d13 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,5 +1,7 @@
 Unreleased
 
+- FIX: In redis backend we now expire the key used to track channel id this can cause a redis key leak
+    with large amounts of subscriptions that go away
 - FEATURE: Much extra implementation documentation, and some improvements to usage documentation.
 - FEATURE: Improvements to development workflow:
   - Fully docker-based development and testing, with no other dependencies.
diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb
index 1ee98a6..cfb2154 100644
--- a/lib/message_bus/backends/redis.rb
+++ b/lib/message_bus/backends/redis.rb
@@ -85,6 +85,9 @@ module MessageBus
         end
       end
 
+      # Note, the script takes care of all expiry of keys, however
+      # we do not expire the global backlog key cause we have no simple way to determine what it should be on publish
+      # we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish
       LUA_PUBLISH = <<LUA
 
       local start_payload = ARGV[1]
@@ -106,12 +109,12 @@ module MessageBus
 
       redis.call("ZADD", backlog_key, backlog_id, payload)
       redis.call("EXPIRE", backlog_key, max_backlog_age)
-
       redis.call("ZADD", global_backlog_key, global_id, global_backlog_message)
       redis.call("EXPIRE", global_backlog_key, max_backlog_age)
-
       redis.call("PUBLISH", redis_channel_name, payload)
 
+      redis.call("EXPIRE", backlog_id_key, max_backlog_age)
+
       if backlog_id > max_backlog_size then
         redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size)
       end
@@ -121,7 +124,6 @@ module MessageBus
       end
 
       return backlog_id
-
 LUA
 
       LUA_PUBLISH_SHA1 = Digest::SHA1.hexdigest(LUA_PUBLISH)
diff --git a/spec/lib/message_bus/backend_spec.rb b/spec/lib/message_bus/backend_spec.rb
index 4df8a66..ec88e2b 100644
--- a/spec/lib/message_bus/backend_spec.rb
+++ b/spec/lib/message_bus/backend_spec.rb
@@ -175,6 +175,8 @@ describe PUB_SUB_CLASS do
 
     expected_backlog_size = 0
 
+    initial_id = @bus.last_id("/foo")
+
     # Start at time = 0s
     @bus.publish "/foo", "bar", max_backlog_age: 1
     expected_backlog_size += 1
@@ -198,6 +200,15 @@ describe PUB_SUB_CLASS do
     @bus.global_backlog.length.must_equal expected_backlog_size
     @bus.backlog("/foo", 0).length.must_equal expected_backlog_size
 
+    # for the time being we can give pg a pass here
+    # TODO: make the implementation here consistent
+    if MESSAGE_BUS_CONFIG[:backend] != :postgres
+      # ids are not opaque we expect them to be reset on our channel if it
+      # got cleared due to an expire, the reason for this is cause we will leak entries due to tracking
+      # this in turn can bloat storage for the backend
+      @bus.last_id("/foo").must_equal initial_id
+    end
+
     sleep 0.75 # Should now be at time =~ 2s
 
     @bus.publish "/foo", "baz", max_backlog_age: 1 # Publish something else before another expiry

GitHub