From 8133710a28d41f27244e349834e54618b02e5ab3 Mon Sep 17 00:00:00 2001 From: pmcclory Date: Wed, 15 Apr 2026 15:08:26 -0400 Subject: [PATCH] feat: add optional prometheus metrics endpoint with saturation metrics for each subject (unique endpoint) expose: * busy instances - the number of instances that are either actively processing a request or have been enqueued for processing in the nats-pure worker thread pool * the total number of instances for a subject (this is equal to the number of leopard instances) * pending requests - the number of requests that have been read from NATS but are waiting to be processed note that this currently accesses the private `handler` attribute of `NATS::Service::Endpoint` which is not part of the public API. a follow-up PR to nats-pure.rb will add `attr_reader :handler` to make this public. Co-authored-by: bougyman --- Gemfile.lock | 2 +- lib/leopard/metrics_server.rb | 90 ++++++++++++++++++++ lib/leopard/nats_api_server.rb | 4 + lib/leopard/templates/prometheus_metrics.erb | 17 ++++ test/lib/nats_api_server.rb | 56 ++++++++++++ 5 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 lib/leopard/metrics_server.rb create mode 100644 lib/leopard/templates/prometheus_metrics.erb diff --git a/Gemfile.lock b/Gemfile.lock index 68b4566..84163b0 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.2.3) + leopard (0.2.4) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb new file mode 100644 index 0000000..4c30b54 --- /dev/null +++ b/lib/leopard/metrics_server.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require 'socket' +require 'erb' + +module Rubyists + module Leopard + module MetricsServer + private + + def start_metrics_server(workers) + port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i + Thread.new do + server = TCPServer.new(port) + logger.info "Metrics server listening on :#{port}" + loop { Thread.new(server.accept) { |client| handle_metrics_client(client, workers) } } + rescue StandardError => e + logger.error "Metrics server error: #{e.message}" + end + end + + def handle_metrics_client(client, workers) + request_line = client.gets + loop { break if (client.gets || '').chomp.empty? } + write_metrics_response(client, request_line, workers) + rescue StandardError => e + logger.warn "Metrics request error: #{e.message}" + ensure + close_client(client) + end + + def close_client(client) + client.close + rescue StandardError + nil + end + + def write_metrics_response(client, request_line, workers) + if request_line&.start_with?('GET /metrics') + body = prometheus_metrics(workers) + client.write "HTTP/1.1 200 OK\r\n" \ + "Content-Type: text/plain; version=0.0.4\r\n" \ + "Content-Length: #{body.bytesize}\r\n\r\n#{body}" + else + client.write "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n" + end + end + + def prometheus_metrics(workers) + metrics = collect_prometheus_metrics(workers) + render_metrics_template(metrics) + end + + def collect_prometheus_metrics(workers) + busy = Hash.new(0) + pending = Hash.new(0) + workers.each { |w| accumulate_worker_metrics(w, busy, pending) } + { + busy:, + pending:, + subjects: (busy.keys | pending.keys).sort, + total: workers.size, + } + end + + def accumulate_worker_metrics(worker, busy, pending) + service = worker.instance_variable_get(:@service) + return unless service + + service.endpoints.each do |ep| + # TODO: use ep.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint + sub = ep.instance_variable_get(:@handler) + next unless sub + + subj = ep.subject.to_s + busy[subj] += sub.concurrency_semaphore.available_permits.zero? ? 1 : 0 + pending[subj] += sub.pending_queue&.size.to_i + end + end + + def render_metrics_template(metrics) + ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics) + end + + def metrics_template_path + File.expand_path('templates/prometheus_metrics.erb', __dir__) + end + end + end +end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 374165d..2209ce2 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -6,6 +6,7 @@ require 'concurrent' require_relative '../leopard' require_relative 'message_wrapper' +require_relative 'metrics_server' module Rubyists module Leopard @@ -24,6 +25,8 @@ def self.included(base) Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) module ClassMethods + include MetricsServer + def endpoints = @endpoints ||= [] def groups = @groups ||= {} def middleware = @middleware ||= [] @@ -78,6 +81,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) pool = spawn_instances(nats_url, service_opts, instances, workers, blocking) logger.info 'Setting up signal trap...' trap_signals(workers, pool) + start_metrics_server(workers) if ENV['LEOPARD_METRICS_PORT'] return pool unless blocking sleep diff --git a/lib/leopard/templates/prometheus_metrics.erb b/lib/leopard/templates/prometheus_metrics.erb new file mode 100644 index 0000000..ee798a9 --- /dev/null +++ b/lib/leopard/templates/prometheus_metrics.erb @@ -0,0 +1,17 @@ +# HELP leopard_subject_busy_instances Instances currently processing a message on this subject +# TYPE leopard_subject_busy_instances gauge +<% subjects.each do |subject| -%> +leopard_subject_busy_instances{subject="<%= subject %>"} <%= busy[subject] %> +<% end -%> + +# HELP leopard_subject_total_instances Total Leopard instances in this process +# TYPE leopard_subject_total_instances gauge +<% subjects.each do |subject| -%> +leopard_subject_total_instances{subject="<%= subject %>"} <%= total %> +<% end -%> + +# HELP leopard_subject_pending_messages Messages pending processing across all instances +# TYPE leopard_subject_pending_messages gauge +<% subjects.each do |subject| -%> +leopard_subject_pending_messages{subject="<%= subject %>"} <%= pending[subject] %> +<% end -%> diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 3d00a5a..7d4d5a1 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -174,4 +174,60 @@ def call(wrapper) @instance.send(:process_result, wrapper, result) wrapper.verify end + + describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength + let(:available_struct) { Struct.new(:zero?) { def available_permits = self } } + let(:queue_struct) { Struct.new(:pending_size) { def size = pending_size } } + let(:handler_struct) { Struct.new(:concurrency_semaphore, :pending_queue) } + let(:endpoint_struct) do + Struct.new(:subject) do + def initialize(subject, handler) + super(subject) + @handler = handler + end + end + end + let(:service_struct) { Struct.new(:endpoints) } + let(:worker_struct) do + Struct.new(:service) do + def instance_variable_get(name) + return service if name == :@service + + super + end + end + end + let(:expected_metrics) do + <<~METRICS + # HELP leopard_subject_busy_instances Instances currently processing a message on this subject + # TYPE leopard_subject_busy_instances gauge + leopard_subject_busy_instances{subject="alpha"} 1 + leopard_subject_busy_instances{subject="beta"} 0 + + # HELP leopard_subject_total_instances Total Leopard instances in this process + # TYPE leopard_subject_total_instances gauge + leopard_subject_total_instances{subject="alpha"} 2 + leopard_subject_total_instances{subject="beta"} 2 + + # HELP leopard_subject_pending_messages Messages pending processing across all instances + # TYPE leopard_subject_pending_messages gauge + leopard_subject_pending_messages{subject="alpha"} 5 + leopard_subject_pending_messages{subject="beta"} 1 + METRICS + end + + it 'renders prometheus metrics from the erb template' do + workers = [ + worker_struct.new(service_struct.new([ + endpoint_struct.new('alpha', handler_struct.new(available_struct.new(true), queue_struct.new(3))), + endpoint_struct.new('beta', handler_struct.new(available_struct.new(false), queue_struct.new(1))), + ])), + worker_struct.new(service_struct.new([ + endpoint_struct.new('alpha', handler_struct.new(available_struct.new(false), queue_struct.new(2))), + ])), + ] + + assert_equal expected_metrics, @klass.send(:prometheus_metrics, workers) + end + end end