Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
leopard (0.2.3)
leopard (0.2.4)
concurrent-ruby (~> 1.1)
dry-configurable (~> 1.3)
dry-monads (~> 1.9)
Expand Down
90 changes: 90 additions & 0 deletions lib/leopard/metrics_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# frozen_string_literal: true

require 'socket'
require 'erb'

module Rubyists
module Leopard
module MetricsServer
private

def start_metrics_server(workers)
port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i
Thread.new do
server = TCPServer.new(port)
logger.info "Metrics server listening on :#{port}"
loop { Thread.new(server.accept) { |client| handle_metrics_client(client, workers) } }
rescue StandardError => e
logger.error "Metrics server error: #{e.message}"
end
end

def handle_metrics_client(client, workers)
request_line = client.gets
loop { break if (client.gets || '').chomp.empty? }
write_metrics_response(client, request_line, workers)
rescue StandardError => e
logger.warn "Metrics request error: #{e.message}"
ensure
close_client(client)
end

def close_client(client)
client.close
rescue StandardError
nil
end

def write_metrics_response(client, request_line, workers)
if request_line&.start_with?('GET /metrics')
body = prometheus_metrics(workers)
client.write "HTTP/1.1 200 OK\r\n" \
"Content-Type: text/plain; version=0.0.4\r\n" \
"Content-Length: #{body.bytesize}\r\n\r\n#{body}"
else
client.write "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n"
end
end

def prometheus_metrics(workers)
metrics = collect_prometheus_metrics(workers)
render_metrics_template(metrics)
end

def collect_prometheus_metrics(workers)
busy = Hash.new(0)
pending = Hash.new(0)
workers.each { |w| accumulate_worker_metrics(w, busy, pending) }
{
busy:,
pending:,
subjects: (busy.keys | pending.keys).sort,
total: workers.size,
}
end

def accumulate_worker_metrics(worker, busy, pending)
service = worker.instance_variable_get(:@service)
return unless service

service.endpoints.each do |ep|
# TODO: use ep.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint
sub = ep.instance_variable_get(:@handler)
next unless sub

subj = ep.subject.to_s
busy[subj] += sub.concurrency_semaphore.available_permits.zero? ? 1 : 0
pending[subj] += sub.pending_queue&.size.to_i
end
end

def render_metrics_template(metrics)
ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics)
end

def metrics_template_path
File.expand_path('templates/prometheus_metrics.erb', __dir__)
end
end
end
end
4 changes: 4 additions & 0 deletions lib/leopard/nats_api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'concurrent'
require_relative '../leopard'
require_relative 'message_wrapper'
require_relative 'metrics_server'

module Rubyists
module Leopard
Expand All @@ -24,6 +25,8 @@ def self.included(base)
Endpoint = Struct.new(:name, :subject, :queue, :group, :handler)

module ClassMethods
include MetricsServer

def endpoints = @endpoints ||= []
def groups = @groups ||= {}
def middleware = @middleware ||= []
Expand Down Expand Up @@ -78,6 +81,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true)
pool = spawn_instances(nats_url, service_opts, instances, workers, blocking)
logger.info 'Setting up signal trap...'
trap_signals(workers, pool)
start_metrics_server(workers) if ENV['LEOPARD_METRICS_PORT']
return pool unless blocking

sleep
Expand Down
17 changes: 17 additions & 0 deletions lib/leopard/templates/prometheus_metrics.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# HELP leopard_subject_busy_instances Instances currently processing a message on this subject
# TYPE leopard_subject_busy_instances gauge
<% subjects.each do |subject| -%>
leopard_subject_busy_instances{subject="<%= subject %>"} <%= busy[subject] %>
<% end -%>

# HELP leopard_subject_total_instances Total Leopard instances in this process
# TYPE leopard_subject_total_instances gauge
<% subjects.each do |subject| -%>
leopard_subject_total_instances{subject="<%= subject %>"} <%= total %>
<% end -%>

# HELP leopard_subject_pending_messages Messages pending processing across all instances
# TYPE leopard_subject_pending_messages gauge
<% subjects.each do |subject| -%>
leopard_subject_pending_messages{subject="<%= subject %>"} <%= pending[subject] %>
<% end -%>
56 changes: 56 additions & 0 deletions test/lib/nats_api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,60 @@ def call(wrapper)
@instance.send(:process_result, wrapper, result)
wrapper.verify
end

describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength
let(:available_struct) { Struct.new(:zero?) { def available_permits = self } }
let(:queue_struct) { Struct.new(:pending_size) { def size = pending_size } }
let(:handler_struct) { Struct.new(:concurrency_semaphore, :pending_queue) }
let(:endpoint_struct) do
Struct.new(:subject) do
def initialize(subject, handler)
super(subject)
@handler = handler
end
end
end
let(:service_struct) { Struct.new(:endpoints) }
let(:worker_struct) do
Struct.new(:service) do
def instance_variable_get(name)
return service if name == :@service

super
end
end
end
let(:expected_metrics) do
<<~METRICS
# HELP leopard_subject_busy_instances Instances currently processing a message on this subject
# TYPE leopard_subject_busy_instances gauge
leopard_subject_busy_instances{subject="alpha"} 1
leopard_subject_busy_instances{subject="beta"} 0

# HELP leopard_subject_total_instances Total Leopard instances in this process
# TYPE leopard_subject_total_instances gauge
leopard_subject_total_instances{subject="alpha"} 2
leopard_subject_total_instances{subject="beta"} 2

# HELP leopard_subject_pending_messages Messages pending processing across all instances
# TYPE leopard_subject_pending_messages gauge
leopard_subject_pending_messages{subject="alpha"} 5
leopard_subject_pending_messages{subject="beta"} 1
METRICS
end

it 'renders prometheus metrics from the erb template' do
workers = [
worker_struct.new(service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(available_struct.new(true), queue_struct.new(3))),
endpoint_struct.new('beta', handler_struct.new(available_struct.new(false), queue_struct.new(1))),
])),
worker_struct.new(service_struct.new([
endpoint_struct.new('alpha', handler_struct.new(available_struct.new(false), queue_struct.new(2))),
])),
]

assert_equal expected_metrics, @klass.send(:prometheus_metrics, workers)
end
end
end
Loading