PERF: message_bus will be deferred by server when flooded

PERF: message_bus will be deferred by server when flooded

The message_bus performs a fair amount of work prior to hijacking requests this change ensures that if there is a situation where the server is flooded message_bus will inform client to back off for 30 seconds + random(120 secs)

This back-off is ultra cheap and happens very early in the middleware.

It corrects a situation where a flood to message bus could cause the app to become unresponsive

MessageBus update is here to ensure message_bus gem properly respects Retry-After header and status 429.

Under normal state this code should never trigger, to disable raise the value of DISCOURSE_REJECT_MESSAGE_BUS_QUEUE_SECONDS, default is to tell message bus to go away if we are queueing for 100ms or longer

diff --git a/Gemfile.lock b/Gemfile.lock
index 19fa675..741e307 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -178,7 +178,7 @@ GEM
       mini_mime (>= 0.1.1)
     maxminddb (0.1.22)
     memory_profiler (0.9.13)
-    message_bus (2.2.0)
+    message_bus (2.2.2)
       rack (>= 1.1.3)
     metaclass (0.0.4)
     method_source (0.9.2)
diff --git a/config/discourse_defaults.conf b/config/discourse_defaults.conf
index 4f409f5..dd4587a 100644
--- a/config/discourse_defaults.conf
+++ b/config/discourse_defaults.conf
@@ -224,6 +224,10 @@ force_anonymous_min_queue_seconds = 1
 # only trigger anon if we see more than N requests for this path in last 10 seconds
 force_anonymous_min_per_10_seconds = 3
 
+# if a message bus request queues for 100ms or longer, we will reject it and ask consumer
+# to back off
+reject_message_bus_queue_seconds = 0.1
+
 # disable search if app server is queueing for longer than this (in seconds)
 disable_search_queue_threshold = 1
 
diff --git a/config/initializers/004-message_bus.rb b/config/initializers/004-message_bus.rb
index a39b6ee..6897ba4 100644
--- a/config/initializers/004-message_bus.rb
+++ b/config/initializers/004-message_bus.rb
@@ -17,6 +17,14 @@ end
 def setup_message_bus_env(env)
   return if env["__mb"]
 
+  ::Middleware::RequestTracker.populate_request_queue_seconds!(env)
+
+  if queue_time = env["REQUEST_QUEUE_SECONDS"]
+    if queue_time > (GlobalSetting.reject_message_bus_queue_seconds).to_f
+      raise RateLimiter::LimitExceeded, 30 + (rand * 120).to_i
+    end
+  end
+
   host = RailsMultisite::ConnectionManagement.host(env)
   RailsMultisite::ConnectionManagement.with_hostname(host) do
     extra_headers = {
diff --git a/lib/middleware/request_tracker.rb b/lib/middleware/request_tracker.rb
index 470ab5f..2a75ada 100644
--- a/lib/middleware/request_tracker.rb
+++ b/lib/middleware/request_tracker.rb
@@ -139,17 +139,23 @@ class Middleware::RequestTracker
 
   end
 
+  def self.populate_request_queue_seconds!(env)
+    if !env['REQUEST_QUEUE_SECONDS']
+      if queue_start = env['HTTP_X_REQUEST_START']
+        queue_start = queue_start.split("t=")[1].to_f
+        queue_time = (Time.now.to_f - queue_start)
+        env['REQUEST_QUEUE_SECONDS'] = queue_time
+      end
+    end
+  end
+
   def call(env)
     result = nil
     log_request = true
 
     # doing this as early as possible so we have an
     # accurate counter
-    if queue_start = env['HTTP_X_REQUEST_START']
-      queue_start = queue_start.split("t=")[1].to_f
-      queue_time = (Time.now.to_f - queue_start)
-      env['REQUEST_QUEUE_SECONDS'] = queue_time
-    end
+    ::Middleware::RequestTracker.populate_request_queue_seconds!(env)
 
     request = Rack::Request.new(env)
 
diff --git a/spec/integration/rate_limiting_spec.rb b/spec/integration/rate_limiting_spec.rb
index 87a5849..5f609cf 100644
--- a/spec/integration/rate_limiting_spec.rb
+++ b/spec/integration/rate_limiting_spec.rb
@@ -14,6 +14,31 @@ describe 'rate limiter integration' do
     RateLimiter.disable
   end
 
+  it "will rate limit message bus requests once queueing" do
+    freeze_time
+
+    global_setting :reject_message_bus_queue_seconds, 0.1
+
+    post "/message-bus/#{SecureRandom.hex}/poll", headers: {
+      "HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.2}"
+    }
+
+    expect(response.status).to eq(429)
+    expect(response.headers['Retry-After']).to be > 29
+  end
+
+  it "will not rate limit when all is good" do
+    freeze_time
+
+    global_setting :reject_message_bus_queue_seconds, 0.1
+
+    post "/message-bus/#{SecureRandom.hex}/poll", headers: {
+      "HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.05}"
+    }
+
+    expect(response.status).to eq(200)
+  end
+
   it "will clear the token cookie if invalid" do
     name = Auth::DefaultCurrentUserProvider::TOKEN_COOKIE

GitHub sha: 1f47ed1e

1 Like