diff --git a/.github/workflows/container.yaml b/.github/workflows/container.yaml index 22d9669..07bce5c 100644 --- a/.github/workflows/container.yaml +++ b/.github/workflows/container.yaml @@ -22,7 +22,7 @@ jobs: max-parallel: 5 matrix: alpine-version: ['3.20'] - ruby-version: ['3.4.1'] + ruby-version: ['4.0.2'] steps: - name: Checkout repository diff --git a/.github/workflows/gem.yaml b/.github/workflows/gem.yaml index a2b161e..b81461c 100644 --- a/.github/workflows/gem.yaml +++ b/.github/workflows/gem.yaml @@ -20,7 +20,7 @@ jobs: name: Set up Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 3.4.1 + ruby-version: 4.0.2 bundler-cache: false - name: Publish to ${{ matrix.registry }} diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index fc8098c..e4fe55f 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -13,10 +13,7 @@ jobs: matrix: ruby: - '3.4.5' - services: - nats: - image: nats:latest - ports: ["4222:4222", "6222:6222", "8222:8222"] + - '4.0.2' steps: - uses: actions/checkout@v4 - name: Set up Ruby @@ -26,7 +23,9 @@ jobs: bundler-cache: false - name: Install dependencies run: bundle install --jobs 4 --retry 3 - - name: Run the default task - run: bundle exec rake + - name: Run CI task + run: bundle exec rake ci env: - NATS_URI: nats://nats:4222 + NATS_URI: nats://127.0.0.1:4222 + LEOPARD_NATS_URL: nats://127.0.0.1:4222 + NATS_NAME: leopard-nats diff --git a/.gitignore b/.gitignore index cd5c3a1..219c2b6 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,5 @@ Gemfile.lock .rvmrc # Used by RuboCop. Remote config files pulled in from inherit_from directive. +.rubocop_cache # .rubocop-https?--* diff --git a/.yardopts b/.yardopts new file mode 100644 index 0000000..e639453 --- /dev/null +++ b/.yardopts @@ -0,0 +1,5 @@ +--readme Readme.adoc +--protected +--private +--output-dir doc/yard +lib/**/*.rb diff --git a/Gemfile b/Gemfile index 2a624dc..958fb8d 100644 --- a/Gemfile +++ b/Gemfile @@ -6,6 +6,8 @@ source 'https://rubygems.org' gemspec group :development, :test do + gem 'asciidoctor' + gem 'irb' gem 'minitest' gem 'minitest-global_expectations' gem 'pry' @@ -16,4 +18,5 @@ group :development, :test do gem 'rubocop-performance' gem 'rubocop-rake' gem 'simplecov' + gem 'yard' end diff --git a/Gemfile.lock b/Gemfile.lock index 84163b0..411d7a2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.2.4) + leopard (0.2.5) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) @@ -11,11 +11,14 @@ PATH GEM remote: https://rubygems.org/ specs: + asciidoctor (2.0.26) ast (2.4.3) base64 (0.3.0) coderay (1.1.3) concurrent-ruby (1.3.5) + date (3.5.1) docile (1.4.1) + drb (2.2.3) dry-configurable (1.3.0) dry-core (~> 1.1) zeitwerk (~> 2.6) @@ -27,13 +30,21 @@ GEM concurrent-ruby (~> 1.0) dry-core (~> 1.1) zeitwerk (~> 2.6) + erb (6.0.3) io-console (0.8.1) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) json (2.13.2) language_server-protocol (3.17.0.5) lint_roller (1.1.0) logger (1.7.0) method_source (1.1.0) - minitest (5.25.5) + minitest (6.0.4) + drb (~> 2.0) + prism (~> 1.5) minitest-global_expectations (1.0.1) minitest (> 5) nats-pure (2.5.0) @@ -46,13 +57,23 @@ GEM parser (3.3.9.0) ast (~> 2.4.1) racc - prism (1.4.0) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) pry (0.15.2) coderay (~> 1.1) method_source (~> 1.0) + psych (5.3.1) + date + stringio racc (1.8.1) rainbow (3.1.1) rake (13.3.0) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort regexp_parser (2.11.2) reline (0.6.2) io-console (~> 0.5) @@ -91,10 +112,13 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.13.2) simplecov_json_formatter (0.1.4) + stringio (3.2.0) + tsort (0.2.0) unicode-display_width (3.1.5) unicode-emoji (~> 4.0, >= 4.0.4) - unicode-emoji (4.0.4) + unicode-emoji (4.2.0) uri (1.0.3) + yard (0.9.41) zeitwerk (2.7.3) PLATFORMS @@ -102,6 +126,8 @@ PLATFORMS x86_64-linux DEPENDENCIES + asciidoctor + irb leopard! minitest minitest-global_expectations @@ -113,6 +139,7 @@ DEPENDENCIES rubocop-performance rubocop-rake simplecov + yard BUNDLED WITH 2.6.9 diff --git a/Rakefile b/Rakefile index 91331ca..afd2395 100644 --- a/Rakefile +++ b/Rakefile @@ -4,8 +4,15 @@ require 'rake' require 'minitest/test_task' require 'bundler/gem_tasks' require 'rubocop/rake_task' +require 'net/http' +require 'open3' +require 'shellwords' +require 'timeout' +require 'yard' +require 'yard/rake/yardoc_task' RuboCop::RakeTask.new +YARD::Rake::YardocTask.new(:yard) Minitest::TestTask.create(:test) do |task| task.libs << 'lib' @@ -14,4 +21,106 @@ Minitest::TestTask.create(:test) do |task| task.warning = true end -task default: %i[rubocop test] +QUICK_TEST_FILES = Dir['test/*/**/*.rb'].reject { |file| file.start_with?('test/integration/') }.sort.freeze + +# Returns the local NATS JetStream health endpoint used by the CI helpers. +# +# @return [URI::HTTP] The health endpoint URI. +def nats_health_uri = URI('http://127.0.0.1:8222/healthz') + +# Reports whether the local NATS JetStream health endpoint is currently reachable. +# +# @return [Boolean] `true` when the broker responds successfully, otherwise `false`. +def nats_ready? + Net::HTTP.get_response(nats_health_uri).is_a?(Net::HTTPSuccess) +rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ECONNRESET + false +end + +# Waits for the local NATS JetStream broker to report healthy. +# +# @return [void] +# @raise [RuntimeError] If the broker does not become healthy within 30 seconds. +def wait_for_nats! + Timeout.timeout(30) do + sleep 1 until nats_ready? + end +rescue Timeout::Error + raise 'Timed out waiting for NATS JetStream health endpoint on http://127.0.0.1:8222/healthz' +end + +# Detects the container runtime used to manage the local NATS broker. +# +# @return [String] `podman` when available, otherwise `docker`. +def container_runtime + File.executable?('/usr/bin/podman') || system('command -v podman > /dev/null 2>&1', exception: false) ? 'podman' : 'docker' +end + +# Runs the non-integration test files directly for a fast local feedback loop. +# +# @return [void] +def run_quick_tests! + sh "ruby -w -Ilib -Itest #{QUICK_TEST_FILES.shelljoin}" +end + +# Verifies that the current YARD coverage is complete. +# +# @return [void] +# @raise [RuntimeError] If YARD reports anything less than 100% documentation coverage. +def verify_yard_coverage! + output, status = Open3.capture2e('bundle', 'exec', 'yard', 'stats', '--list-undoc') + puts output + raise 'yard stats failed' unless status.success? + return if output.include?('100.00% documented') + + raise 'YARD documentation coverage is incomplete' +end + +namespace :nats do + desc 'Start the local NATS JetStream broker via ./ci/nats/start.sh' + task :start do + sh({ 'NATS_DETACH' => '1' }, './ci/nats/start.sh') + end + + desc 'Wait for the local NATS JetStream broker health endpoint' + task :wait do + wait_for_nats! + end + + desc 'Stop the local NATS JetStream broker container' + task :stop do + name = ENV.fetch('NATS_NAME', 'leopard-nats') + sh(container_runtime, 'rm', '-f', name, verbose: false) + rescue RuntimeError + nil + end +end + +namespace :ci do + desc 'Run RuboCop, YARD verification, and the non-integration test suite without managing NATS' + task quick: %i[rubocop yard:verify] do + run_quick_tests! + end + + desc 'Run the full test suite against a managed local NATS JetStream broker' + task :test do + Rake::Task['nats:start'].invoke + Rake::Task['nats:wait'].invoke + Rake::Task['test'].invoke + ensure + Rake::Task['nats:stop'].reenable + Rake::Task['nats:stop'].invoke + end +end + +desc 'Run RuboCop and the full test suite against a managed local NATS JetStream broker' +task ci: %w[rubocop yard:verify ci:test] + +namespace :yard do + desc 'Fail if YARD reports incomplete documentation coverage' + task :verify do + verify_yard_coverage! + end +end + +task default: :ci diff --git a/Readme.adoc b/Readme.adoc index 69e4494..fe4f220 100644 --- a/Readme.adoc +++ b/Readme.adoc @@ -12,6 +12,7 @@ minimal DSL for defining endpoints and middleware. == Features * Declarative endpoint definitions with `#endpoint`. +* Declarative JetStream pull consumers with `#jetstream_endpoint`. * Grouping of endpoints with `#group` * Simple concurrency via `#run` with a configurable number of instances. * JSON aware message wrapper that gracefully handles parse errors. @@ -90,13 +91,72 @@ end EchoService.use LoggerMiddleware ---- +== JetStream Pull Consumers + +Leopard can also bind JetStream pull consumers through the same middleware and `Dry::Monads::Result` +handler contract used by request/reply endpoints. + +[source,ruby] +---- +class EventConsumer + include Rubyists::Leopard::NatsApiServer + + jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-created-worker', + consumer: { max_deliver: 5 }, + batch: 5, + fetch_timeout: 1, + nak_delay: 2, + ) do |msg| + Success(msg.data) + end +end +---- + +JetStream handlers receive the same `Rubyists::Leopard::MessageWrapper` as service endpoints. +Leopard will: + +* `ack` on `Success` +* `nak` on `Failure` (`nak_delay:` is optional) +* `term` on unhandled exceptions + +Each Leopard `instances:` worker creates its own pull subscription loop, so JetStream consumers +scale with the same process-local concurrency model as the rest of the framework. + == Development The project uses Minitest and RuboCop. Run tests with Rake: [source,bash] ---- -$ bundle exec rake +$ bundle exec rake ci +---- + +This task starts NATS JetStream through `./ci/nats/start.sh`, waits for broker health, +runs RuboCop and the test suite, and then stops the broker. + +API documentation can be generated with: + +[source,bash] +---- +$ bundle exec rake yard +---- + +Documentation coverage is enforced with: + +[source,bash] +---- +$ bundle exec rake yard:verify +---- + +If you want to run the broker yourself, the same script can still be used directly: + +[source,bash] +---- +$ ./ci/nats/start.sh ---- === Conventional Commits (semantic commit messages) diff --git a/ci/nats/start.sh b/ci/nats/start.sh index 0b237b4..a39aa59 100755 --- a/ci/nats/start.sh +++ b/ci/nats/start.sh @@ -1,6 +1,8 @@ #!/usr/bin/env bash NATS_VERSION=2 +NATS_NAME=${NATS_NAME:-leopard-nats} +NATS_DETACH=${NATS_DETACH:-0} if readlink -f . >/dev/null 2>&1 # {{{ makes readlink work on mac then @@ -29,5 +31,28 @@ else runtime=docker fi +args=( + run + --rm + --name "$NATS_NAME" + -p 4222:4222 + -p 6222:6222 + -p 8222:8222 + -v ./accounts.txt:/accounts.txt +) + +if [ "$NATS_DETACH" = "1" ] +then + args+=(-d) +else + args+=(-it) +fi + +args+=( + "nats:$NATS_VERSION" + -js + -c /accounts.txt +) + set -x -exec "$runtime" run --rm -it -p 4222:4222 -p 6222:6222 -p 8222:8222 -v ./accounts.txt:/accounts.txt nats:"$NATS_VERSION" -js -c /accounts.txt "$@" +exec "$runtime" "${args[@]}" "$@" diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 261b0b3..51ac12b 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -8,6 +8,12 @@ class EchoService include Rubyists::Leopard::NatsApiServer config.logger = SemanticLogger[:EchoService] + + # Builds the example service instance. + # + # @param a_var [Integer] Example initializer state passed through `instance_args`. + # + # @return [void] def initialize(a_var: 1) logger.info "EchoService initialized with a_var: #{a_var}" end diff --git a/examples/jetstream_endpoint.rb b/examples/jetstream_endpoint.rb new file mode 100755 index 0000000..70a725b --- /dev/null +++ b/examples/jetstream_endpoint.rb @@ -0,0 +1,44 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative '../lib/leopard/nats_api_server' + +# Example JetStream worker for async event processing. +class EventConsumer + include Rubyists::Leopard::NatsApiServer + + config.logger = SemanticLogger[:EventConsumer] + + jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-created-worker', + consumer: { + ack_wait: 30, + max_deliver: 5, + }, + batch: 5, + fetch_timeout: 1, + nak_delay: 2, + ) do |msg| + logger.info 'Processing event', data: msg.data + Success(msg.data) + rescue StandardError => e + Failure(error: e.message, data: msg.data) + end +end + +if __FILE__ == $PROGRAM_NAME + SemanticLogger.default_level = :info + SemanticLogger.add_signal_handler + SemanticLogger.add_appender(io: $stdout, formatter: :color) + EventConsumer.run( + nats_url: 'nats://localhost:4222', + service_opts: { + name: 'example.event_consumer', + version: '1.0.0', + }, + instances: ENV.fetch('EVENT_CONSUMER_INSTANCES', '1').to_i, + ) +end diff --git a/lib/leopard.rb b/lib/leopard.rb index 777f526..c9453e3 100644 --- a/lib/leopard.rb +++ b/lib/leopard.rb @@ -1,16 +1,28 @@ # frozen_string_literal: true require 'dry/configurable' +require 'dry/monads' require 'pathname' require 'semantic_logger' +## +# Namespace for Leopard and related helper extensions. class Pathname + # Joins the receiver with another path fragment. + # + # @param other [#to_s] The path fragment to append. + # + # @return [Pathname] The combined path. def /(other) join other.to_s end end +## +# Top-level namespace for Rubyists gems. module Rubyists + ## + # Namespace for Leopard runtime, DSL, and support classes. module Leopard end end @@ -18,3 +30,8 @@ module Leopard require_relative 'leopard/settings' require_relative 'leopard/version' require_relative 'leopard/errors' +require_relative 'leopard/message_processor' +require_relative 'leopard/nats_jetstream_endpoint' +require_relative 'leopard/nats_jetstream_callbacks' +require_relative 'leopard/nats_jetstream_consumer' +require_relative 'leopard/nats_request_reply_callbacks' diff --git a/lib/leopard/errors.rb b/lib/leopard/errors.rb index 985a373..706d0a6 100644 --- a/lib/leopard/errors.rb +++ b/lib/leopard/errors.rb @@ -2,12 +2,19 @@ module Rubyists module Leopard + # Base Leopard exception that truncates backtraces for cleaner logs. class LeopardError < StandardError + # Captures the original exception state while replacing the backtrace with the current call stack. + # + # @return [void] def initialize(...) super set_backtrace(caller) end + # Returns a Leopard-truncated backtrace. + # + # @return [Array] Up to the first four backtrace entries, plus a truncation marker when applicable. def backtrace # If the backtrace is nil, return an empty array orig = (super || [])[0..3] @@ -19,8 +26,11 @@ def backtrace end end + # Generic Leopard error superclass. class Error < LeopardError; end + # Raised when Leopard configuration is invalid. class ConfigurationError < Error; end + # Raised when a handler returns an unsupported object instead of a result monad. class ResultError < Error; end end end diff --git a/lib/leopard/message_processor.rb b/lib/leopard/message_processor.rb new file mode 100644 index 0000000..fad9bfa --- /dev/null +++ b/lib/leopard/message_processor.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + # Composes middleware around Leopard handlers and routes their results to transport callbacks. + class MessageProcessor + private attr_reader :execute_handler, :logger, :middleware, :wrapper_factory + + # Builds a reusable processor for request/reply and JetStream transports. + # + # @param wrapper_factory [#call] Callable that wraps a raw transport message in a {MessageWrapper}-compatible object. + # @param middleware [#call] Callable returning the current middleware stack. + # @param execute_handler [#call] Callable that executes the endpoint handler with the wrapped message. + # @param logger [#error] Logger used for processing failures. + # + # @return [void] + def initialize(wrapper_factory:, middleware:, execute_handler:, logger:) + @wrapper_factory = wrapper_factory + @middleware = middleware + @execute_handler = execute_handler + @logger = logger + end + + # Processes a raw transport message through middleware and terminal callbacks. + # + # @param raw_msg [Object] The raw transport message from NATS. + # @param handler [Proc] The endpoint handler to execute. + # @param callbacks [Hash{Symbol => #call}] Success, failure, and error callbacks for the transport. + # + # @return [Object] The transport-specific callback result. + def process(raw_msg, handler, callbacks) + app(callbacks, handler).call(wrapper_factory.call(raw_msg)) + end + + private + + # Builds the middleware stack around the terminal application. + # + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # @param handler [Proc] The endpoint handler to execute at the core of the stack. + # + # @return [#call] The composed middleware application. + def app(callbacks, handler) + middleware.call.reverse_each.reduce(base_app(handler, callbacks)) do |current, (klass, args, blk)| + klass.new(current, *args, &blk) + end + end + + # Builds the terminal application that runs the handler and dispatches transport callbacks. + # + # @param handler [Proc] The endpoint handler to execute. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # + # @return [Proc] The terminal application for the middleware chain. + def base_app(handler, callbacks) + lambda do |wrapper| + result = execute_handler.call(wrapper, handler) + process_result(wrapper, result, callbacks) + rescue StandardError => e + logger.error 'Error processing message: ', e + callbacks[:on_error].call(wrapper, e) + end + end + + # Routes a {Dry::Monads::Result} to the appropriate transport callback. + # + # @param wrapper [MessageWrapper] The wrapped transport message. + # @param result [Dry::Monads::Result] The handler result to route. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. + # + # @return [Object] The callback return value for the routed result. + # @raise [ResultError] If the handler returned a non-result object. + def process_result(wrapper, result, callbacks) + case result + in Dry::Monads::Success + callbacks[:on_success].call(wrapper, result) + in Dry::Monads::Failure + callbacks[:on_failure].call(wrapper, result) + else + logger.error('Unexpected result: ', result:) + raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" + end + end + end + end +end diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index 793f5a9..6dd1426 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -4,15 +4,19 @@ module Rubyists module Leopard + # Wraps a raw NATS message with parsed payload and convenience response helpers. class MessageWrapper # @!attribute [r] raw + # # @return [NATS::Message] The original NATS message. # # @!attribute [r] data + # # @return [Object] The parsed data from the NATS message. attr_reader :raw, :data # # @!attribute [w] headers + # # @return [Hash] The headers from the NATS message. attr_accessor :headers diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb index 4c30b54..36d97bc 100644 --- a/lib/leopard/metrics_server.rb +++ b/lib/leopard/metrics_server.rb @@ -5,9 +5,15 @@ module Rubyists module Leopard + # Adds a minimal Prometheus HTTP endpoint for Leopard worker metrics. module MetricsServer private + # Starts a lightweight HTTP server that exposes Leopard Prometheus metrics. + # + # @param workers [Array] Active Leopard worker instances to observe. + # + # @return [Thread] The server thread. def start_metrics_server(workers) port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i Thread.new do @@ -19,6 +25,12 @@ def start_metrics_server(workers) end end + # Handles an individual metrics HTTP client connection. + # + # @param client [TCPSocket] The connected HTTP client. + # @param workers [Array] Active Leopard worker instances to observe. + # + # @return [void] def handle_metrics_client(client, workers) request_line = client.gets loop { break if (client.gets || '').chomp.empty? } @@ -29,12 +41,24 @@ def handle_metrics_client(client, workers) close_client(client) end + # Closes a metrics client socket, ignoring cleanup failures. + # + # @param client [TCPSocket] The connected HTTP client. + # + # @return [void] def close_client(client) client.close rescue StandardError nil end + # Writes the HTTP response for a metrics request. + # + # @param client [TCPSocket] The connected HTTP client. + # @param request_line [String, nil] The first line of the HTTP request. + # @param workers [Array] Active Leopard worker instances to observe. + # + # @return [void] def write_metrics_response(client, request_line, workers) if request_line&.start_with?('GET /metrics') body = prometheus_metrics(workers) @@ -46,11 +70,21 @@ def write_metrics_response(client, request_line, workers) end end + # Builds the Prometheus metrics payload for the current worker state. + # + # @param workers [Array] Active Leopard worker instances to observe. + # + # @return [String] Rendered Prometheus text exposition output. def prometheus_metrics(workers) metrics = collect_prometheus_metrics(workers) render_metrics_template(metrics) end + # Aggregates per-subject worker utilization metrics. + # + # @param workers [Array] Active Leopard worker instances to observe. + # + # @return [Hash{Symbol => Object}] Metric hashes for the Prometheus template. def collect_prometheus_metrics(workers) busy = Hash.new(0) pending = Hash.new(0) @@ -63,6 +97,13 @@ def collect_prometheus_metrics(workers) } end + # Adds one worker's endpoint saturation metrics to the aggregate hashes. + # + # @param worker [Object] A Leopard worker instance. + # @param busy [Hash{String => Integer}] Subject-to-busy-worker counts. + # @param pending [Hash{String => Integer}] Subject-to-pending-message counts. + # + # @return [void] def accumulate_worker_metrics(worker, busy, pending) service = worker.instance_variable_get(:@service) return unless service @@ -78,10 +119,18 @@ def accumulate_worker_metrics(worker, busy, pending) end end + # Renders the metrics ERB template with aggregated metric data. + # + # @param metrics [Hash{Symbol => Object}] Aggregated metric data for template rendering. + # + # @return [String] The rendered Prometheus payload. def render_metrics_template(metrics) ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics) end + # Returns the absolute path to the Prometheus metrics template. + # + # @return [String] The metrics template path. def metrics_template_path File.expand_path('templates/prometheus_metrics.erb', __dir__) end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 2209ce2..69a74f3 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -6,29 +6,55 @@ require 'concurrent' require_relative '../leopard' require_relative 'message_wrapper' +require_relative 'message_processor' require_relative 'metrics_server' +require_relative 'nats_jetstream_endpoint' +require_relative 'nats_jetstream_consumer' +require_relative 'nats_request_reply_callbacks' module Rubyists module Leopard + # DSL and runtime integration for Leopard request/reply and JetStream workers. module NatsApiServer include Dry::Monads[:result] extend Dry::Monads[:result] + # Extends an including class with Leopard's DSL and worker lifecycle behavior. + # + # @param base [Class] The class including this module. + # + # @return [void] def self.included(base) base.extend(ClassMethods) - base.include(InstanceMethods) + base.include(WorkerLifecycle) + base.include(MessageHandling) base.extend(Dry::Monads[:result]) base.extend(Dry::Configurable) base.setting :logger, default: Rubyists::Leopard.logger, reader: true end - Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) + # Configuration for a request/reply endpoint declared with {.endpoint}. + Endpoint = Struct.new(:name, :subject, :queue, :group, :handler, keyword_init: true) + # Class-level DSL for defining Leopard endpoints, middleware, and worker startup. module ClassMethods include MetricsServer + # Returns the configured request/reply endpoints for the service class. + # + # @return [Array] Declared request/reply endpoints. def endpoints = @endpoints ||= [] + # Returns the configured JetStream endpoints for the service class. + # + # @return [Array] Declared JetStream pull-consumer endpoints. + def jetstream_endpoints = @jetstream_endpoints ||= [] + # Returns the configured endpoint groups for the service class. + # + # @return [Hash{Symbol,String => Hash}] Declared group definitions. def groups = @groups ||= {} + # Returns the configured middleware stack for the service class. + # + # @return [Array] Middleware declarations in registration order. def middleware = @middleware ||= [] # Define an endpoint for the NATS API server. @@ -38,12 +64,36 @@ def middleware = @middleware ||= [] # @param queue [String, nil] The NATS queue group to use. Defaults to nil. # @param group [String, nil] The group this endpoint belongs to. Defaults to nil. # @param handler [Proc] The block that will handle incoming messages. + # @yield [wrapper] Handles the wrapped request message. + # @yieldparam wrapper [MessageWrapper] The wrapped inbound NATS message. + # @yieldreturn [Dry::Monads::Result] The handler result. # # @return [void] def endpoint(name, subject: nil, queue: nil, group: nil, &handler) endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) end + # Define a JetStream pull consumer endpoint. + # + # @param name [String] The name of the endpoint. + # @param options [Hash] JetStream endpoint configuration. + # @option options [String] :stream The JetStream stream name. + # @option options [String] :subject The JetStream subject filter. + # @option options [String] :durable The durable consumer name. + # @option options [Hash, NATS::JetStream::API::ConsumerConfig, nil] :consumer Optional consumer config. + # @option options [Integer] :batch (1) Number of messages to fetch per pull request. + # @option options [Numeric] :fetch_timeout (5) Maximum time to wait for fetched messages. + # @option options [Numeric, nil] :nak_delay Optional delayed redelivery value for `nak`. + # @param handler [Proc] The block that will handle incoming messages. + # @yield [wrapper] Handles the wrapped JetStream message. + # @yieldparam wrapper [MessageWrapper] The wrapped inbound JetStream message. + # @yieldreturn [Dry::Monads::Result] The handler result. + # + # @return [void] + def jetstream_endpoint(name, **options, &handler) + jetstream_endpoints << build_jetstream_endpoint(name, options, handler) + end + # Define a group for organizing endpoints. # # @param name [String] The name of the group. @@ -74,7 +124,7 @@ def use(klass, *args, &block) # @param instances [Integer] The number of instances to spawn. Defaults to 1. # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. # - # @return [void] + # @return [Concurrent::FixedThreadPool, void] The worker pool for non-blocking runs, otherwise blocks forever. def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' workers = Concurrent::Array.new @@ -98,6 +148,7 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true) # @param blocking [Boolean] If false, does not block current thread after starting the server. # # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # @raise [ArgumentError] If `instance_args` was provided but is not a hash. def spawn_instances(url, opts, count, workers, blocking) pool = Concurrent::FixedThreadPool.new(count) @instance_args = opts.delete(:instance_args) || nil @@ -177,50 +228,154 @@ def wake_main_thread_and_exit! rescue StandardError exit 1 end + + # Builds a JetStream endpoint struct with Leopard defaults applied. + # + # @param name [String, Symbol] Endpoint name. + # @param options [Hash] JetStream endpoint options. + # @param handler [Proc] Endpoint handler block. + # + # @return [NatsJetstreamEndpoint] The configured JetStream endpoint definition. + def build_jetstream_endpoint(name, options, handler) + NatsJetstreamEndpoint.new( + name:, + handler:, + consumer: nil, + batch: 1, + fetch_timeout: 5, + nak_delay: nil, + **options, + ) + end end - module InstanceMethods + # Instance-side worker boot and shutdown helpers. + module WorkerLifecycle # Returns the logger configured for the NATS API server. + # + # @return [Object] The configured logger. def logger = self.class.logger # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # - # @param url [String] The URL of the NATS server. - # @param opts [Hash] Options for the NATS service. - # @param eps [Array] The list of endpoints to add. - # @param gps [Hash] The groups to add. + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) - @thread = Thread.current - @client = NATS.connect nats_url - @service = @client.services.add(build_service_opts(service_opts:)) - gps = self.class.groups.dup - eps = self.class.endpoints.dup - group_map = add_groups(gps) - add_endpoints eps, group_map + initialize_worker_state + connect_client(nats_url) + initialize_service(service_opts) + add_endpoints(self.class.endpoints.dup, add_groups(self.class.groups.dup)) + start_jetstream_consumer(self.class.jetstream_endpoints.dup) end # Sets up a worker thread for the NATS API server and blocks the current thread. # # @see #setup_worker + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. + # + # @return [void] def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) setup_worker(nats_url:, service_opts:) sleep end # Stops the NATS API server worker. + # + # @return [void] def stop - @service&.stop - @client&.close - @thread&.wakeup + @running = false + stop_jetstream + stop_service + wake_worker rescue ThreadError nil end private + # Captures the current thread for later wakeup during shutdown. + # + # @return [Thread] The current worker thread. + def initialize_worker_state + @thread = Thread.current + end + + # Opens the NATS client connection for this worker. + # + # @param nats_url [String] The URL of the NATS server. + # + # @return [Object] The connected NATS client. + def connect_client(nats_url) + @client = NATS.connect(nats_url) + end + + # Registers the NATS service for this worker. + # + # @param service_opts [Hash] Options for the NATS service. + # + # @return [Object] The created NATS service. + def initialize_service(service_opts) + @service = @client.services.add(build_service_opts(service_opts:)) + end + + # Starts the JetStream consumer coordinator when JetStream endpoints are present. + # + # @param endpoints [Array] JetStream endpoints for this worker. + # + # @return [void] + def start_jetstream_consumer(endpoints) + return if endpoints.empty? + + @jetstream_consumer = jetstream_consumer_class.new( + jetstream: @client.jetstream, + endpoints:, + logger:, + process_message: method(:process_transport_message), + thread_factory:, + ) + @jetstream_consumer.start + end + + # Stops the JetStream consumer coordinator if one was started. + # + # @return [void] + def stop_jetstream + @jetstream_consumer&.stop + end + + # Stops the registered NATS service and closes the client connection. + # + # @return [void] + def stop_service + @service&.stop + @client&.close + end + + # Wakes the worker thread if it is blocked. + # + # @return [Thread, nil] The awakened worker thread, if present. + def wake_worker + @thread&.wakeup + end + + # Returns the JetStream consumer coordinator class for this worker. + # + # @return [Class] The JetStream consumer implementation class. + def jetstream_consumer_class + NatsJetstreamConsumer + end + + # Returns the thread factory used for JetStream consumer loops. + # + # @return [Class] The thread factory class. + def thread_factory + Thread + end + # Builds the service options for the NATS service. # # @param service_opts [Hash] Options for the NATS service. @@ -251,6 +406,7 @@ def add_groups(gps) # @param name [String] The name of the group to build. # # @return [NATS::Group] The created group object. + # @raise [ArgumentError] If the requested group was never defined. def build_group(defs, cache, name) return cache[name] if cache.key?(name) @@ -267,6 +423,7 @@ def build_group(defs, cache, name) # @param group_map [Hash] A map of group names to their created group objects. # # @return [void] + # @raise [ArgumentError] If an endpoint references an undefined group. def add_endpoints(endpoints, group_map) endpoints.each do |ep| grp = ep.group @@ -276,6 +433,16 @@ def add_endpoints(endpoints, group_map) build_endpoint(parent, ep) end end + end + + # Message execution helpers shared by request/reply and JetStream transports. + module MessageHandling + # Returns the logger configured for the NATS API server. + # + # @return [Object] The configured logger. + def logger = self.class.logger + + private # Builds an endpoint in the NATS service. # @@ -286,58 +453,48 @@ def add_endpoints(endpoints, group_map) # @return [void] def build_endpoint(parent, ept) parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| - wrapper = MessageWrapper.new(raw_msg) - dispatch_with_middleware(wrapper, ept.handler) + process_transport_message(raw_msg, ept.handler, request_reply_callbacks.callbacks) end end - # Dispatches a message through the middleware stack and handles it with the provided handler. + # Processes a raw transport message through Leopard's middleware and callback pipeline. # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param handler [Proc] The handler to process the message. + # @param raw_msg [Object] The raw NATS transport message. + # @param handler [Proc] The endpoint handler block. + # @param callbacks [Hash{Symbol => #call}] Transport callbacks keyed by outcome. # - # @return [void] - def dispatch_with_middleware(wrapper, handler) - app = ->(w) { handle_message(w.raw, handler) } - self.class.middleware.reverse_each do |(klass, args, blk)| - app = klass.new(app, *args, &blk) - end - app.call(wrapper) + # @return [Object] The transport-specific callback result. + def process_transport_message(raw_msg, handler, callbacks) + message_processor.process(raw_msg, handler, callbacks) end - # Handles a raw NATS message using the provided handler. + # Returns the callback helper for request/reply endpoints. # - # @param raw_msg [NATS::Message] The raw NATS message to handle. - # @param handler [Proc] The handler to process the message. + # @return [NatsRequestReplyCallbacks] The request/reply callback helper. + def request_reply_callbacks + @request_reply_callbacks ||= NatsRequestReplyCallbacks.new(logger:) + end + + # Returns the memoized message processor for this worker instance. # - # @return [void] - def handle_message(raw_msg, handler) - wrapper = MessageWrapper.new(raw_msg) - result = instance_exec(wrapper, &handler) - process_result(wrapper, result) - rescue StandardError => e - logger.error 'Error processing message: ', e - wrapper.respond_with_error(e) + # @return [MessageProcessor] The shared message processor. + def message_processor + @message_processor ||= MessageProcessor.new( + wrapper_factory: MessageWrapper.method(:new), + middleware: -> { self.class.middleware }, + execute_handler: method(:execute_handler), + logger:, + ) end - # Processes the result of the handler execution. + # Executes an endpoint handler within the worker instance context. # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param result [Dry::Monads::Result] The result of the handler execution. + # @param wrapper [MessageWrapper] The wrapped transport message. + # @param handler [Proc] The endpoint handler block. # - # @return [void] - # @raise [ResultError] If the result is not a Success or Failure monad. - def process_result(wrapper, result) - case result - in Dry::Monads::Success - wrapper.respond(result.value!) - in Dry::Monads::Failure - logger.error 'Error processing message: ', result.failure - wrapper.respond_with_error(result.failure) - else - logger.error('Unexpected result: ', result:) - raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" - end + # @return [Dry::Monads::Result] The handler result. + def execute_handler(wrapper, handler) + instance_exec(wrapper, &handler) end end end diff --git a/lib/leopard/nats_jetstream_callbacks.rb b/lib/leopard/nats_jetstream_callbacks.rb new file mode 100644 index 0000000..1e8a2bc --- /dev/null +++ b/lib/leopard/nats_jetstream_callbacks.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + # Maps Leopard handler outcomes to JetStream ack, nak, and term operations. + class NatsJetstreamCallbacks + # Builds a callback set for JetStream message outcomes. + # + # @param logger [#error] Logger used for failures and unhandled exceptions. + # + # @return [void] + def initialize(logger:) + @logger = logger + end + + # Returns transport callbacks for a JetStream endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # + # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. + def callbacks_for(endpoint) + { + on_success: method(:ack_message), + on_failure: ->(wrapper, result) { nak_message(wrapper, result, endpoint) }, + on_error: method(:term_message), + } + end + + private + + # Acknowledges a successfully processed JetStream message. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param _result [Dry::Monads::Success] Successful handler result. + # + # @return [void] + def ack_message(wrapper, _result) + wrapper.raw.ack + end + + # Negatively acknowledges a failed JetStream message, optionally delaying redelivery. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param result [Dry::Monads::Failure] Failed handler result. + # @param endpoint [NatsJetstreamEndpoint] Endpoint configuration for the message. + # + # @return [void] + def nak_message(wrapper, result, endpoint) + log_failure(result.failure) + return wrapper.raw.nak unless endpoint.nak_delay + + wrapper.raw.nak(delay: endpoint.nak_delay) + end + + # Terminates a JetStream message after an unhandled exception. + # + # @param wrapper [MessageWrapper] Wrapped JetStream message. + # @param error [StandardError] The unhandled exception. + # + # @return [void] + def term_message(wrapper, error) + @logger.error 'Unhandled JetStream error: ', error + wrapper.raw.term + end + + # Logs the failure payload returned by a handler. + # + # @param failure [Object] The failure payload from the handler. + # + # @return [void] + def log_failure(failure) + @logger.error 'Error processing message: ', failure + end + end + end +end diff --git a/lib/leopard/nats_jetstream_consumer.rb b/lib/leopard/nats_jetstream_consumer.rb new file mode 100644 index 0000000..9695e38 --- /dev/null +++ b/lib/leopard/nats_jetstream_consumer.rb @@ -0,0 +1,186 @@ +# frozen_string_literal: true + +require_relative 'nats_jetstream_callbacks' +require_relative 'nats_jetstream_endpoint' + +module Rubyists + module Leopard + # Coordinates JetStream pull subscriptions and dispatches fetched messages through Leopard. + class NatsJetstreamConsumer + # Consumer configuration keys Leopard owns and will not allow endpoint overrides to replace. + PROTECTED_CONSUMER_KEYS = %i[durable_name filter_subject ack_policy].freeze + + # @!attribute [r] subscriptions + # + # @return [Array] Active JetStream pull subscriptions. + # @!attribute [r] threads + # + # @return [Array] Consumer loop threads for each endpoint. + attr_reader :subscriptions, :threads + + # Builds a pull-consumer coordinator for one Leopard worker. + # + # @param jetstream [Object] JetStream client used to manage consumers and subscriptions. + # @param endpoints [Array] JetStream endpoint definitions for this worker. + # @param logger [#error] Logger used for loop failures. + # @param process_message [#call] Callable that processes a raw JetStream message through Leopard. + # @param dependencies [Hash{Symbol => Object}] Optional collaborators for callback and thread creation. + # @option dependencies [Class] :callback_builder (NatsJetstreamCallbacks) Builder for transport callbacks. + # @option dependencies [Class] :thread_factory (Thread) Thread-like factory used to spawn consumer loops. + # + # @return [void] + def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) + @jetstream = jetstream + @endpoints = endpoints + @logger = logger + @process_message = process_message + @callbacks = dependencies.fetch(:callback_builder, NatsJetstreamCallbacks).new(logger:) + @thread_factory = dependencies.fetch(:thread_factory, Thread) + @subscriptions = [] + @threads = [] + @running = false + end + + # Starts one pull-consumer loop per configured endpoint. + # + # @return [void] + def start + @running = true + @endpoints.each { |endpoint| start_endpoint(endpoint) } + end + + # Stops all pull-consumer loops and waits for them to exit. + # + # @return [void] + def stop + @running = false + subscriptions.each(&:unsubscribe) + threads.each(&:join) + end + + private + + # Starts a consumer loop for one endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to consume. + # + # @return [void] + def start_endpoint(endpoint) + subscription = build_subscription(endpoint) + subscriptions << subscription + threads << @thread_factory.new { consume_endpoint(subscription, endpoint) } + end + + # Ensures the durable consumer exists and creates a pull subscription for it. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to subscribe to. + # + # @return [Object] The JetStream pull subscription. + def build_subscription(endpoint) + ensure_consumer(endpoint) + @jetstream.pull_subscribe( + endpoint.subject, + endpoint.durable, + stream: endpoint.stream, + ) + end + + # Verifies that the durable consumer exists, creating it when missing. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to ensure. + # + # @return [Object] Consumer metadata from `consumer_info` or `add_consumer`. + def ensure_consumer(endpoint) + @jetstream.consumer_info(endpoint.stream, endpoint.durable) + rescue NATS::JetStream::Error::NotFound + @jetstream.add_consumer(endpoint.stream, consumer_config(endpoint)) + end + + # Builds the JetStream consumer configuration for an endpoint. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to translate. + # + # @return [Hash] Consumer configuration accepted by `add_consumer`. + def consumer_config(endpoint) + base = { + durable_name: endpoint.durable, + filter_subject: endpoint.subject, + ack_policy: 'explicit', + } + base.merge(safe_consumer_options(endpoint)) + end + + # Normalizes optional consumer overrides into a hash. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # + # @return [Hash] Consumer overrides, or an empty hash when none were provided. + def normalized_consumer_options(endpoint) + return {} unless endpoint.consumer + return endpoint.consumer.to_h if endpoint.consumer.respond_to?(:to_h) + + endpoint.consumer + end + + # Removes Leopard-managed consumer keys from user overrides. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration to inspect. + # + # @return [Hash] Consumer overrides excluding protected keys required by Leopard. + def safe_consumer_options(endpoint) + normalized_consumer_options(endpoint).reject { |key, _value| PROTECTED_CONSUMER_KEYS.include?(key.to_sym) } + end + + # Repeatedly fetches and processes batches for one endpoint while the consumer is running. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # + # @return [void] + def consume_endpoint(subscription, endpoint) + while @running + begin + consume_batch(subscription, endpoint) + rescue NATS::Timeout + next if @running + rescue StandardError => e + log_loop_error(endpoint, e) + break unless @running + end + end + end + + # Fetches one batch from JetStream and processes each message through Leopard. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # + # @return [void] + def consume_batch(subscription, endpoint) + fetch_messages(subscription, endpoint).each do |raw_msg| + @process_message.call(raw_msg, endpoint.handler, @callbacks.callbacks_for(endpoint)) + end + end + + # Fetches a batch of messages for one endpoint. + # + # @param subscription [Object] Pull subscription for the endpoint. + # @param endpoint [NatsJetstreamEndpoint] The endpoint configuration being consumed. + # + # @return [Array] Raw JetStream messages returned by the subscription. + def fetch_messages(subscription, endpoint) + subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout) + end + + # Logs an endpoint-level loop failure. + # + # @param endpoint [NatsJetstreamEndpoint] The endpoint whose loop failed. + # @param error [StandardError] The raised exception. + # + # @return [void] + def log_loop_error(endpoint, error) + @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error + end + end + end +end diff --git a/lib/leopard/nats_jetstream_endpoint.rb b/lib/leopard/nats_jetstream_endpoint.rb new file mode 100644 index 0000000..a26f58d --- /dev/null +++ b/lib/leopard/nats_jetstream_endpoint.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + # Configuration for a Leopard JetStream pull-consumer endpoint. + NatsJetstreamEndpoint = Struct.new( + :name, + :stream, + :subject, + :durable, + :consumer, + :batch, + :fetch_timeout, + :nak_delay, + :handler, + keyword_init: true, + ) + end +end diff --git a/lib/leopard/nats_request_reply_callbacks.rb b/lib/leopard/nats_request_reply_callbacks.rb new file mode 100644 index 0000000..8c9ffd2 --- /dev/null +++ b/lib/leopard/nats_request_reply_callbacks.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module Rubyists + module Leopard + # Maps Leopard handler outcomes to request/reply response behavior. + class NatsRequestReplyCallbacks + # Builds a callback set for request/reply endpoint outcomes. + # + # @param logger [#error] Logger used for failure payloads. + # + # @return [void] + def initialize(logger:) + @logger = logger + end + + # Returns transport callbacks for request/reply endpoints. + # + # @return [Hash{Symbol => #call}] Outcome callbacks keyed by `:on_success`, `:on_failure`, and `:on_error`. + def callbacks + { + on_success: method(:respond_with_success), + on_failure: method(:respond_with_failure), + on_error: method(:respond_with_error), + } + end + + private + + # Responds to a successful request with the handler payload. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param result [Dry::Monads::Success] Successful handler result. + # + # @return [void] + def respond_with_success(wrapper, result) + wrapper.respond(result.value!) + end + + # Responds to a failed request with the failure payload. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param result [Dry::Monads::Failure] Failed handler result. + # + # @return [void] + def respond_with_failure(wrapper, result) + log_failure(result.failure) + wrapper.respond_with_error(result.failure) + end + + # Responds to a request with an exception payload after an unhandled error. + # + # @param wrapper [MessageWrapper] Wrapped request message. + # @param error [StandardError] The unhandled exception. + # + # @return [void] + def respond_with_error(wrapper, error) + wrapper.respond_with_error(error) + end + + # Logs the failure payload returned by a handler. + # + # @param failure [Object] The failure payload from the handler. + # + # @return [void] + def log_failure(failure) + @logger.error 'Error processing message: ', failure + end + end + end +end diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..eefa30b --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +ruby = "latest" diff --git a/test/helper.rb b/test/helper.rb index d7162e7..39b99e5 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,6 +13,7 @@ end require 'minitest/autorun' +require 'minitest/mock' require_relative '../lib/leopard' # Suppress logs when running tests diff --git a/test/integration/nats_jetstream_integration_test.rb b/test/integration/nats_jetstream_integration_test.rb new file mode 100644 index 0000000..4e20be5 --- /dev/null +++ b/test/integration/nats_jetstream_integration_test.rb @@ -0,0 +1,259 @@ +# frozen_string_literal: true + +require 'json' +require 'securerandom' +require 'timeout' +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_api_server' + +module NatsJetstreamBrokerHelpers + NATS_URL = ENV.fetch('LEOPARD_NATS_URL', ENV.fetch('NATS_URI', 'nats://127.0.0.1:4222')) + WAIT_TIMEOUT = 5 + NO_REDELIVERY_TIMEOUT = 1.5 + + private + + def setup_integration_context + @workers = [] + @streams = [] + @service_classes = [] + skip 'NATS JetStream broker not available' unless jetstream_available? + + @client = NATS.connect(**nats_connect_options) + @jetstream = @client.jetstream + end + + def teardown_integration_context + @workers.reverse_each(&:stop) + @streams.reverse_each { |stream| @jetstream&.delete_stream(stream) } + @service_classes.reverse_each { |klass| remove_service_class(klass) } + @client&.close + end + + def jetstream_available? + nc = NATS.connect(**nats_connect_options) + nc.jetstream.account_info + nc.close + true + rescue StandardError + false + end + + def build_names + token = SecureRandom.hex(4) + { + stream: "EVENTS_#{token}", + subject: "events.#{token}", + durable: "events_consumer_#{token}", + service: "JetstreamService#{token}", + } + end + + def publish(subject, payload) + @jetstream.publish(subject, JSON.generate(payload)) + end + + def pop_event(queue, timeout: WAIT_TIMEOUT) + Timeout.timeout(timeout) { queue.pop } + end + + def refute_event(queue, timeout: NO_REDELIVERY_TIMEOUT) + Timeout.timeout(timeout) { queue.pop } + + flunk 'expected no additional JetStream delivery' + rescue Timeout::Error + nil + end + + def wait_for(timeout: WAIT_TIMEOUT) + Timeout.timeout(timeout) do + loop do + return if yield + + sleep 0.05 + end + end + end + + def nats_connect_options + { + uri: NATS_URL, + reconnect: false, + connect_timeout: 0.5, + max_reconnect_attempts: 0, + } + end +end + +module NatsJetstreamServiceHelpers + private + + def build_worker(names, middleware: nil, &handler) + create_stream(names) + klass = build_service_class(names, middleware:, &handler) + worker = klass.new + worker.setup_worker( + nats_url: NatsJetstreamBrokerHelpers::NATS_URL, + service_opts: { name: names[:service], version: '1.0.0' }, + ) + wait_for_consumer(names) + @workers << worker + worker + end + + def create_stream(names) + @jetstream.add_stream(name: names[:stream], subjects: [names[:subject]]) + @streams << names[:stream] + end + + def build_service_class(names, middleware: nil, &handler) + klass = Class.new do + include Rubyists::Leopard::NatsApiServer + + config.logger = SemanticLogger[:JetstreamIntegration] + end + self.class.const_set(names[:service], klass) + @service_classes << klass + klass.use(middleware) if middleware + klass.jetstream_endpoint(:events, **endpoint_options(names), &handler) + klass + end + + def endpoint_options(names) + { + stream: names[:stream], + subject: names[:subject], + durable: names[:durable], + consumer: { ack_wait: 1, max_deliver: 5 }, + batch: 1, + fetch_timeout: 0.25, + nak_delay: 1, + } + end + + def build_tracking_middleware(queue) + Class.new do + define_method(:initialize) { |app| @app = app } + define_method(:call) do |wrapper| + queue << :middleware + @app.call(wrapper) + end + end + end + + def wait_for_consumer(names) + wait_for do + @jetstream.consumer_info(names[:stream], names[:durable]) + true + rescue NATS::JetStream::Error::NotFound + false + end + end + + def remove_service_class(klass) + self.class.send(:remove_const, klass.name.split('::').last) + rescue NameError + nil + end +end + +class NatsJetstreamSuccessIntegrationTest < Minitest::Test + include NatsJetstreamBrokerHelpers + include NatsJetstreamServiceHelpers + + def setup + setup_integration_context + end + + def teardown + teardown_integration_context + end + + def test_success_acks_once_and_runs_middleware + tracker, names = build_success_flow + + publish(names[:subject], { ok: true }) + + assert_success_flow(tracker) + end + + private + + def build_success_flow + tracker = Queue.new + names = build_names + middleware = build_tracking_middleware(tracker) + build_worker(names, middleware:, &success_handler(tracker)) + [tracker, names] + end + + def success_handler(tracker) + lambda do |msg| + tracker << [:handler, msg.class.name, msg.raw.metadata.num_delivered] + Dry::Monads::Success(msg.data) + end + end + + def assert_success_flow(tracker) + assert_equal :middleware, pop_event(tracker) + assert_equal [:handler, 'Rubyists::Leopard::MessageWrapper', 1], pop_event(tracker) + refute_event(tracker) + end +end + +class NatsJetstreamRetryIntegrationTest < Minitest::Test + include NatsJetstreamBrokerHelpers + include NatsJetstreamServiceHelpers + + def setup + setup_integration_context + end + + def teardown + teardown_integration_context + end + + def test_failure_naks_and_redelivers + attempts, names = build_retry_flow + + publish(names[:subject], { ok: false }) + + assert_redelivery(attempts) + end + + def test_exception_terms_without_redelivery + attempts = Queue.new + names = build_names + build_worker(names) do |msg| + attempts << msg.raw.metadata.num_delivered + raise 'boom' + end + publish(names[:subject], { ok: false }) + + assert_equal 1, pop_event(attempts) + refute_event(attempts) + end + + private + + def build_retry_flow + attempts = Queue.new + names = build_names + build_worker(names, &retry_handler(attempts)) + [attempts, names] + end + + def retry_handler(attempts) + lambda do |msg| + attempts << msg.raw.metadata.num_delivered + return Dry::Monads::Failure('retry') if msg.raw.metadata.num_delivered == 1 + + Dry::Monads::Success(msg.data) + end + end + + def assert_redelivery(attempts) + assert_equal 1, pop_event(attempts) + assert_equal 2, pop_event(attempts) + end +end diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 7d4d5a1..8328dcf 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -2,6 +2,7 @@ require_relative '../helper' require Rubyists::Leopard.libroot / 'leopard/nats_api_server' +require Rubyists::Leopard.libroot / 'leopard/message_processor' describe 'Rubyists::Leopard::NatsApiServer' do # rubocop:disable Metrics/BlockLength before do @@ -16,6 +17,8 @@ cm = mod::ClassMethods cm.const_set(:Success, mod::Success) unless cm.const_defined?(:Success) cm.const_set(:Failure, mod::Failure) unless cm.const_defined?(:Failure) + @logger = Object.new + @logger.define_singleton_method(:error) { |*| nil } end it 'registers an endpoint' do @@ -46,6 +49,34 @@ assert_equal blk, endpoint.handler end + it 'registers a jetstream endpoint with options' do + blk = proc {} + @klass.jetstream_endpoint( + :events, + stream: 'EVENTS', + subject: 'events.created', + durable: 'events-consumer', + consumer: { max_deliver: 5 }, + batch: 10, + fetch_timeout: 2, + nak_delay: 1, + &blk + ) + + assert_equal 1, @klass.jetstream_endpoints.length + endpoint = @klass.jetstream_endpoints.first + + assert_equal :events, endpoint.name + assert_equal 'EVENTS', endpoint.stream + assert_equal 'events.created', endpoint.subject + assert_equal 'events-consumer', endpoint.durable + assert_equal({ max_deliver: 5 }, endpoint.consumer) + assert_equal 10, endpoint.batch + assert_equal 2, endpoint.fetch_timeout + assert_equal 1, endpoint.nak_delay + assert_equal blk, endpoint.handler + end + it 'registers a group' do @klass.group :math, queue: 'math' @@ -104,75 +135,86 @@ def call(wrapper) wrapper.log << :handler Dry::Monads::Success(:ok) } - @instance.stub(:process_result, ->(_wrapper, _result) {}) do - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:dispatch_with_middleware, wrapper, handler) - end - end + processor = Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { @klass.middleware }, + execute_handler: ->(message, block) { block.call(message) }, + logger: @logger, + ) + processor.process(raw, handler, on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_) {}) assert_equal %i[mw1 mw2 handler], order end - it 'handles a message and processes result' do - raw_msg = Object.new - wrapper = Object.new + it 'executes a handler and routes Success to the success callback' do result = Dry::Monads::Success(:ok) - received = nil - handler = proc { |w| - received = w - result - } - processed = nil + wrapper = Object.new + success = nil + processor = processor_for(wrapper:, result:) - # Create an instance of the class to test instance methods after middleware is added - @instance = @klass.new + processor.process(:raw, proc { |message| message }, callback_set(on_success: ->(message, value) { success = [message, value] })) - @instance.stub(:process_result, ->(w, r) { processed = [w, r] }) do - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:handle_message, raw_msg, handler) - end - end - - assert_equal wrapper, received - assert_equal [wrapper, result], processed + assert_equal [wrapper, result], success end - it 'responds with error when handler raises' do - raw_msg = Object.new + it 'routes raised errors to the error callback' do err = nil wrapper = Object.new - wrapper.define_singleton_method(:respond_with_error) { |raised| err = raised } - Rubyists::Leopard::MessageWrapper.stub(:new, wrapper) do - @instance.send(:handle_message, raw_msg, proc { raise 'boom' }) - end + processor = Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { [] }, + execute_handler: ->(*) { raise 'boom' }, + logger: @logger, + ) + processor.process(:raw, proc {}, callback_set(on_error: ->(_message, raised) { err = raised })) assert_instance_of RuntimeError, err assert_equal 'boom', err.message end - it 'responds when processing Success result' do + it 'routes Success results unchanged to the success callback' do wrapper = Minitest::Mock.new - wrapper.expect(:respond, nil, ['ok']) + on_success = Minitest::Mock.new result = Rubyists::Leopard::NatsApiServer::Success.new('ok') - @instance.send(:process_result, wrapper, result) - wrapper.verify + on_success.expect(:call, nil, [wrapper, result]) + processor_for(wrapper:, result:).process(:raw, proc { |message| message }, callback_set(on_success:, on_failure: ->(*_) {})) + on_success.verify end - it 'responds when processing Failure result' do + it 'routes Failure results unchanged to the failure callback' do wrapper = Minitest::Mock.new - wrapper.expect(:respond_with_error, nil, ['fail']) + on_failure = Minitest::Mock.new result = Rubyists::Leopard::NatsApiServer::Failure.new('fail') - @instance.send(:process_result, wrapper, result) - wrapper.verify + on_failure.expect(:call, nil, [wrapper, result]) + processor_for(wrapper:, result:).process(:raw, proc { |message| message }, callback_set(on_success: ->(*_) {}, on_failure:)) + on_failure.verify end it 'passes hash failures through unchanged' do err = { code: 422, description: 'invalid' } - wrapper = Minitest::Mock.new - wrapper.expect(:respond_with_error, nil, [err]) + wrapper = Object.new result = Rubyists::Leopard::NatsApiServer::Failure.new(err) - @instance.send(:process_result, wrapper, result) - wrapper.verify + received = nil + processor_for(wrapper:, result:).process( + :raw, + proc { |message| message }, + callback_set(on_success: ->(*_) {}, on_failure: ->(_wrapper, failure_result) { received = failure_result.failure }), + ) + + assert_equal err, received + end + + def processor_for(wrapper:, result:) + Rubyists::Leopard::MessageProcessor.new( + wrapper_factory: ->(*) { wrapper }, + middleware: -> { [] }, + execute_handler: ->(*) { result }, + logger: @logger, + ) + end + + def callback_set(on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_) {}) + { on_success:, on_failure:, on_error: } end describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength diff --git a/test/lib/nats_jetstream_consumer_test.rb b/test/lib/nats_jetstream_consumer_test.rb new file mode 100644 index 0000000..f31d693 --- /dev/null +++ b/test/lib/nats_jetstream_consumer_test.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_jetstream_consumer' +require Rubyists::Leopard.libroot / 'leopard/nats_jetstream_endpoint' + +class NatsJetstreamConsumerTest < Minitest::Test + def setup + @consumer = Rubyists::Leopard::NatsJetstreamConsumer.new( + jetstream: Object.new, + endpoints: [], + logger: Object.new, + process_message: ->(*_) {}, + ) + end + + def test_consumer_config_preserves_durable_name + assert_equal 'events-consumer', symbol_key_config[:durable_name] + end + + def test_consumer_config_preserves_filter_subject + assert_equal 'events.created', symbol_key_config[:filter_subject] + end + + def test_consumer_config_preserves_explicit_ack_policy + assert_equal 'explicit', symbol_key_config[:ack_policy] + end + + def test_consumer_config_keeps_safe_symbol_key_overrides + assert_equal 30, symbol_key_config[:ack_wait] + assert_equal 5, symbol_key_config[:max_deliver] + end + + def test_consumer_config_strips_protected_string_keys + refute_includes string_key_config.keys, 'durable_name' + refute_includes string_key_config.keys, 'filter_subject' + refute_includes string_key_config.keys, 'ack_policy' + end + + def test_consumer_config_keeps_safe_string_key_overrides + assert_equal 100, string_key_config['max_ack_pending'] + end + + private + + def symbol_key_config + @symbol_key_config ||= @consumer.send(:consumer_config, endpoint_with_consumer(symbol_key_overrides)) + end + + def string_key_config + @string_key_config ||= @consumer.send(:consumer_config, endpoint_with_consumer(string_key_overrides)) + end + + def endpoint_with_consumer(consumer) + Rubyists::Leopard::NatsJetstreamEndpoint.new(**base_endpoint_attributes, consumer:) + end + + def symbol_key_overrides + { + durable_name: 'override-durable', + filter_subject: 'override.subject', + ack_policy: 'none', + ack_wait: 30, + max_deliver: 5, + } + end + + def string_key_overrides + { + 'durable_name' => 'override-durable', + 'filter_subject' => 'override.subject', + 'ack_policy' => 'none', + 'max_ack_pending' => 100, + } + end + + def base_endpoint_attributes + { + name: :events, stream: 'EVENTS', subject: 'events.created', durable: 'events-consumer', + batch: 1, fetch_timeout: 1, nak_delay: nil, handler: proc {} + } + end +end diff --git a/test/lib/nats_request_reply_callbacks_test.rb b/test/lib/nats_request_reply_callbacks_test.rb new file mode 100644 index 0000000..1ceb72f --- /dev/null +++ b/test/lib/nats_request_reply_callbacks_test.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require_relative '../helper' +require Rubyists::Leopard.libroot / 'leopard/nats_request_reply_callbacks' +require 'dry/monads' + +class NatsRequestReplyCallbacksTest < Minitest::Test + def setup + @logger = Minitest::Mock.new + @callbacks = Rubyists::Leopard::NatsRequestReplyCallbacks.new(logger: @logger).callbacks + end + + def test_success_responds_with_value + wrapper = Minitest::Mock.new + wrapper.expect(:respond, nil, ['ok']) + + @callbacks[:on_success].call(wrapper, Dry::Monads::Result::Success.new('ok')) + + wrapper.verify + end + + def test_failure_logs_and_responds_with_error + wrapper = Minitest::Mock.new + wrapper.expect(:respond_with_error, nil, ['fail']) + @logger.expect(:error, nil, ['Error processing message: ', 'fail']) + + @callbacks[:on_failure].call(wrapper, Dry::Monads::Result::Failure.new('fail')) + + wrapper.verify + @logger.verify + end + + def test_error_responds_with_error + error = RuntimeError.new('boom') + wrapper = Minitest::Mock.new + wrapper.expect(:respond_with_error, nil, [error]) + + @callbacks[:on_error].call(wrapper, error) + + wrapper.verify + end +end