1
0
مراية لـ https://github.com/postalserver/postal.git تم المزامنة 2025-11-30 21:32:30 +00:00

feat: add health server and prometheus metrics to worker

هذا الالتزام موجود في:
Adam Cooke
2024-02-23 18:11:05 +00:00
ملتزم من قبل Adam Cooke
الأصل 1ae8ef6401
التزام a2eb70edf1
7 ملفات معدلة مع 167 إضافات و3 حذوفات

عرض الملف

@@ -19,6 +19,8 @@ module Worker
# after it has completed any outstanding jobs which are already inflight.
class Process
include HasPrometheusMetrics
# An array of job classes that should be processed each time the worker ticks.
#
# @return [Array<Class>]
@@ -48,6 +50,8 @@ module Worker
@work_sleep_time = work_sleep_time
@task_sleep_time = task_sleep_time
@threads = []
setup_prometheus
end
def run
@@ -114,7 +118,7 @@ module Worker
logger.tagged(component: "worker", thread: "work#{index}") do
logger.info "started work thread #{index}"
loop do
work_completed = work
work_completed = work(index)
if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time)
break
@@ -129,7 +133,7 @@ module Worker
# Actually perform the work for this tick. This will call each job which has been registered.
#
# @return [Boolean] Whether any work was completed in this job or not
def work
def work(thread)
completed_work = 0
ActiveRecord::Base.connection_pool.with_connection do
JOBS.each do |job_class|
@@ -137,7 +141,14 @@ module Worker
job = job_class.new(logger: logger)
job.call
completed_work += 1 if job.work_completed?
if job.work_completed?
completed_work += 1
increment_prometheus_counter :postal_worker_job_executions,
labels: {
thread: thread,
job: job_class.to_s.split("::").last
}
end
end
end
end
@@ -236,6 +247,19 @@ module Worker
logger.error "#{e.class} (#{e.message})"
e.backtrace.each { |line| logger.error line }
Sentry.capture_exception(e) if defined?(Sentry)
increment_prometheus_counter :postal_worker_errors,
labels: { error: e.class.to_s }
end
def setup_prometheus
register_prometheus_counter :postal_worker_job_executions,
docstring: "The number of jobs worked by a worker",
labels: [:thread, :job]
register_prometheus_counter :postal_worker_errors,
docstring: "The number of errors encountered while processing jobs",
labels: [:error]
end
end

عرض الملف

@@ -0,0 +1,23 @@
# frozen_string_literal: true
module HasPrometheusMetrics
def register_prometheus_counter(name, **kwargs)
counter = Prometheus::Client::Counter.new(name, **kwargs)
registry.register(counter)
end
def increment_prometheus_counter(name, labels: {})
counter = registry.get(name)
return if counter.nil?
counter.increment(labels: labels)
end
private
def registry
Prometheus::Client.registry
end
end

107
app/util/health_server.rb Normal file
عرض الملف

@@ -0,0 +1,107 @@
# frozen_string_literal: true
require "socket"
require "rack/handler/webrick"
require "prometheus/client/formats/text"
class HealthServer
def initialize(name: "unnamed-process")
@name = name
end
def call(env)
case env["PATH_INFO"]
when "/health"
ok
when "/metrics"
metrics
when "/"
root
else
not_found
end
end
private
def root
[200, { "Content-Type" => "text/plain" }, ["#{@name} (pid: #{Process.pid}, host: #{Socket.gethostname})"]]
end
def ok
[200, { "Content-Type" => "text/plain" }, ["OK"]]
end
def not_found
[404, { "Content-Type" => "text/plain" }, ["Not Found"]]
end
def metrics
registry = Prometheus::Client.registry
body = Prometheus::Client::Formats::Text.marshal(registry)
[200, { "Content-Type" => "text/plain" }, [body]]
end
class << self
def run(default_port: 9090, **options)
port = ENV.fetch("HEALTH_SERVER_PORT", default_port)
Rack::Handler::WEBrick.run(new(**options),
Port: port,
BindAddress: bind_address,
AccessLog: [],
Logger: LoggerProxy.new)
rescue Errno::EADDRINUSE
Postal.logger.info "health server port (#{bind_address}:#{port}) is already " \
"in use, not starting health server"
end
def bind_address
ENV.fetch("HEALTH_SERVER_BIND_ADDRESS", "127.0.0.1")
end
def start(**options)
thread = Thread.new { run(**options) }
thread.abort_on_exception = false
thread
end
end
class LoggerProxy
[:info, :debug, :warn, :error, :fatal].each do |severity|
define_method(severity) do |message|
add(severity, message)
end
define_method("#{severity}?") do
severity != :debug
end
end
def add(severity, message)
return if severity == :debug
case message
when /\AWEBrick::HTTPServer#start:.*port=(\d+)/
Postal.logger.info "started health server on port #{::Regexp.last_match(1)}", component: "true"
when /\AWEBrick::HTTPServer#start done/
Postal.logger.info "stopped health server"
when /\AWEBrick [\d.]+/,
/\Aruby ([\d.]+)/,
/\ARack::Handler::WEBrick is mounted/,
/\Aclose TCPSocket/,
/\Agoing to shutdown/
# Don't actually print routine messages to avoid too much
# clutter when processes start it
else
Postal.logger.debug message, component: "true"
end
end
end
end