FEATURE: Add collector and instrumentation for Sidekiq queues (#114)

FEATURE: Add collector and instrumentation for Sidekiq queues (#114)

  • Add sidekiq_queue collector and instrumentation

  • Update README

  • Refactor SidekiqQueue collector to instantiate Gauges when needed

  • No need to add queue information twice

  • Add basic tests for SidekiqQueueCollector

diff --git a/README.md b/README.md
index 99a2cd8..bcbd2b2 100644
--- a/README.md
+++ b/README.md
@@ -279,6 +279,17 @@ Sidekiq.configure_server do |config|
 end
 `‍``
 
+To monitor Queue size and latency:
+
+`‍``ruby
+Sidekiq.configure_server do |config|
+  config.on :startup do
+    require 'prometheus_exporter/instrumentation'
+    PrometheusExporter::Instrumentation::SidekiqQueue.start
+  end
+end
+`‍``
+
 To monitor Sidekiq process info:
 
 `‍``ruby
@@ -551,8 +562,8 @@ The following will run the process at
 - Port `8080` (default `9394`)
 - Bind to `0.0.0.0` (default `localhost`)
 - Timeout in `1 second` for metrics endpoint (default `2 seconds`)
-- Metric prefix as `foo_` (default `ruby_`) 
-- Default labels as `{environment: "integration", foo: "bar"}` 
+- Metric prefix as `foo_` (default `ruby_`)
+- Default labels as `{environment: "integration", foo: "bar"}`
 
 `‍``bash
 prometheus_exporter -p 8080 \
diff --git a/lib/prometheus_exporter/instrumentation.rb b/lib/prometheus_exporter/instrumentation.rb
index da457f5..420ed51 100644
--- a/lib/prometheus_exporter/instrumentation.rb
+++ b/lib/prometheus_exporter/instrumentation.rb
@@ -4,6 +4,7 @@ require_relative "client"
 require_relative "instrumentation/process"
 require_relative "instrumentation/method_profiler"
 require_relative "instrumentation/sidekiq"
+require_relative "instrumentation/sidekiq_queue"
 require_relative "instrumentation/delayed_job"
 require_relative "instrumentation/puma"
 require_relative "instrumentation/hutch"
diff --git a/lib/prometheus_exporter/instrumentation/sidekiq_queue.rb b/lib/prometheus_exporter/instrumentation/sidekiq_queue.rb
new file mode 100644
index 0000000..ab4544d
--- /dev/null
+++ b/lib/prometheus_exporter/instrumentation/sidekiq_queue.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+module PrometheusExporter::Instrumentation
+  class SidekiqQueue
+    def self.start(client: nil, frequency: 30)
+      client ||= PrometheusExporter::Client.default
+      sidekiq_queue_collector = new
+
+      Thread.new do
+        loop do
+          begin
+            client.send_json(sidekiq_queue_collector.collect)
+          rescue StandardError => e
+            STDERR.puts("Prometheus Exporter Failed To Collect Sidekiq Queue metrics #{e}")
+          ensure
+            sleep frequency
+          end
+        end
+      end
+    end
+
+    def collect
+      {
+        type: 'sidekiq_queue',
+        queues: collect_queue_stats
+      }
+    end
+
+    def collect_queue_stats
+      ::Sidekiq::Queue.all.map do |queue|
+        {
+          backlog_total:   queue.size,
+          latency_seconds: queue.latency.to_i,
+          labels: { queue: queue.name }
+        }
+      end
+    end
+  end
+end
diff --git a/lib/prometheus_exporter/server.rb b/lib/prometheus_exporter/server.rb
index dde5df0..75b4e6a 100644
--- a/lib/prometheus_exporter/server.rb
+++ b/lib/prometheus_exporter/server.rb
@@ -5,6 +5,7 @@ require_relative "server/type_collector"
 require_relative "server/web_collector"
 require_relative "server/process_collector"
 require_relative "server/sidekiq_collector"
+require_relative "server/sidekiq_queue_collector"
 require_relative "server/delayed_job_collector"
 require_relative "server/collector_base"
 require_relative "server/collector"
diff --git a/lib/prometheus_exporter/server/collector.rb b/lib/prometheus_exporter/server/collector.rb
index 9903869..93755da 100644
--- a/lib/prometheus_exporter/server/collector.rb
+++ b/lib/prometheus_exporter/server/collector.rb
@@ -13,6 +13,7 @@ module PrometheusExporter::Server
       register_collector(WebCollector.new)
       register_collector(ProcessCollector.new)
       register_collector(SidekiqCollector.new)
+      register_collector(SidekiqQueueCollector.new)
       register_collector(DelayedJobCollector.new)
       register_collector(PumaCollector.new)
       register_collector(HutchCollector.new)
diff --git a/lib/prometheus_exporter/server/sidekiq_queue_collector.rb b/lib/prometheus_exporter/server/sidekiq_queue_collector.rb
new file mode 100644
index 0000000..842541f
--- /dev/null
+++ b/lib/prometheus_exporter/server/sidekiq_queue_collector.rb
@@ -0,0 +1,44 @@
+module PrometheusExporter::Server
+  class SidekiqQueueCollector < TypeCollector
+    MAX_SIDEKIQ_METRIC_AGE = 60
+
+    SIDEKIQ_QUEUE_GAUGES = {
+      'backlog_total'   => 'Size of the sidekiq queue.',
+      'latency_seconds' => 'Latency of the sidekiq queue.',
+    }.freeze
+
+    attr_reader :sidekiq_metrics, :gauges
+
+    def initialize
+      @sidekiq_metrics = []
+      @gauges = {}
+    end
+
+    def type
+      'sidekiq_queue'
+    end
+
+    def metrics
+      sidekiq_metrics.map do |metric|
+        labels = metric.fetch("labels", {})
+        SIDEKIQ_QUEUE_GAUGES.map do |name, help|
+          if (value = metric[name])
+            gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("sidekiq_queue_#{name}", help)
+            gauge.observe(value, labels)
+          end
+        end
+      end
+
+      gauges.values
+    end
+
+    def collect(object)
+      now = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
+      object['queues'].each do |queue|
+        queue["created_at"] = now
+        sidekiq_metrics.delete_if { |metric| metric['created_at'] + MAX_SIDEKIQ_METRIC_AGE < now }
+        sidekiq_metrics << queue
+      end
+    end
+  end
+end
diff --git a/test/server/sidekiq_queue_collector_test.rb b/test/server/sidekiq_queue_collector_test.rb
new file mode 100644
index 0000000..fc783a6
--- /dev/null
+++ b/test/server/sidekiq_queue_collector_test.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+require 'test_helper'
+require 'prometheus_exporter/server'
+require 'prometheus_exporter/instrumentation'
+
+class PrometheusSidekiqQueueCollectorTest < Minitest::Test
+  def setup
+    PrometheusExporter::Metric::Base.default_prefix = ''
+  end
+
+  def collector
+    @collector ||= PrometheusExporter::Server::SidekiqQueueCollector.new
+  end
+
+  def test_collecting_metrics
+    collector.collect(
+      'queues' => [
+        'backlog_total' => 16,
+        'latency_seconds' => 7,
+        'labels' => { 'queue' => 'default' }
+      ]
+    )
+
+    metrics = collector.metrics
+
+    expected = [
+      'sidekiq_queue_backlog_total{queue="default"} 16',
+      'sidekiq_queue_latency_seconds{queue="default"} 7',
+    ]
+    assert_equal expected, metrics.map(&:metric_text)
+  end
+
+  def test_only_fresh_metrics_are_collected
+    Process.stub(:clock_gettime, 1.0) do
+      collector.collect(
+        'queues' => [
+          'backlog_total' => 1,
+          'labels' => { 'queue' => 'default' }
+        ]
+      )
+    end
+
+    Process.stub(:clock_gettime, 2.0 + PrometheusExporter::Server::SidekiqQueueCollector::MAX_SIDEKIQ_METRIC_AGE) do
+      collector.collect(
+        'queues' => [
+          'latency_seconds' => 1,
+          'labels' => { 'queue' => 'default' }
+        ]
+      )
+
+      metrics = collector.metrics
+
+      expected = [
+        'sidekiq_queue_latency_seconds{queue="default"} 1',
+      ]
+      assert_equal expected, metrics.map(&:metric_text)
+    end
+  end
+end

GitHub sha: 501b84af

This commit appears in #114 which was merged by SamSaffron.