1
0
مراية لـ https://github.com/postalserver/postal.git تم المزامنة 2025-11-30 21:32:30 +00:00

refactor: move worker from lib/worker to app/lib/worker

هذا الالتزام موجود في:
Adam Cooke
2024-02-22 22:27:52 +00:00
ملتزم من قبل Adam Cooke
الأصل a44e1f9081
التزام 93fc120f44
4 ملفات معدلة مع 0 إضافات و0 حذوفات

عرض الملف

@@ -0,0 +1,29 @@
# frozen_string_literal: true
module Worker
module Jobs
class BaseJob
def initialize(logger:)
@logger = logger
end
def call
# Override me.
end
def work_completed?
@work_completed == true
end
private
def work_completed!
@work_completed = true
end
attr_reader :logger
end
end
end

عرض الملف

@@ -0,0 +1,73 @@
# frozen_string_literal: true
module Worker
module Jobs
class ProcessQueuedMessagesJob < BaseJob
def call
@lock_time = Time.current
@locker = Postal.locker_name_with_suffix(SecureRandom.hex(8))
find_ip_addresses
lock_message_for_processing
obtain_locked_messages
process_messages
@messages_to_process
end
private
# Returns an array of IP address IDs that are present on the host that is
# running this job.
#
# @return [Array<Integer>]
def find_ip_addresses
ip_addresses = { 4 => [], 6 => [] }
Socket.ip_address_list.each do |address|
next if local_ip?(address.ip_address)
ip_addresses[address.ipv4? ? 4 : 6] << address.ip_address
end
@ip_addresses = IPAddress.where(ipv4: ip_addresses[4]).or(IPAddress.where(ipv6: ip_addresses[6])).pluck(:id)
end
# Is the given IP address a local address?
#
# @param [String] ip
# @return [Boolean]
def local_ip?(ip)
!!(ip =~ /\A(127\.|fe80:|::)/)
end
# Obtain a queued message from the database for processing
#
# @return [void]
def lock_message_for_processing
QueuedMessage.where(ip_address_id: [nil, @ip_addresses])
.where(locked_by: nil, locked_at: nil)
.ready_with_delayed_retry
.limit(1)
.update_all(locked_by: @locker, locked_at: @lock_time)
end
# Get a full list of all messages which we can process (i.e. those which have just
# been locked by us for processing)
#
# @return [void]
def obtain_locked_messages
@messages_to_process = QueuedMessage.where(locked_by: @locker, locked_at: @lock_time)
end
# Process the messages we obtained from the database
#
# @return [void]
def process_messages
@messages_to_process.each do |message|
work_completed!
MessageDequeuer.process(message, logger: logger)
end
end
end
end
end

عرض الملف

@@ -0,0 +1,49 @@
# frozen_string_literal: true
module Worker
module Jobs
class ProcessWebhookRequestsJob < BaseJob
def call
@lock_time = Time.current
@locker = Postal.locker_name_with_suffix(SecureRandom.hex(8))
lock_request_for_processing
obtain_locked_requests
process_requests
end
private
# Obtain a webhook request from the database for processing
#
# @return [void]
def lock_request_for_processing
WebhookRequest.unlocked
.ready
.limit(1)
.update_all(locked_by: @locker, locked_at: @lock_time)
end
# Get a full list of all webhooks which we can process (i.e. those which have just
# been locked by us for processing)
#
# @return [void]
def obtain_locked_requests
@requests_to_process = WebhookRequest.where(locked_by: @locker, locked_at: @lock_time)
end
# Process the webhook requests we obtained from the database
#
# @return [void]
def process_requests
@requests_to_process.each do |request|
work_completed!
WebhookDeliveryService.new(webhook_request: request).call
end
end
end
end
end

242
app/lib/worker/process.rb Normal file
عرض الملف

@@ -0,0 +1,242 @@
# frozen_string_literal: true
module Worker
# The Postal Worker process is responsible for handling all background tasks. This includes processing of all
# messages, webhooks and other administrative tasks. There are two main types of background work which is completed,
# jobs and scheduled tasks.
#
# The 'Jobs' here allow for the continuous monitoring of a database table (or queue) and processing of any new items
# which may appear in that. The polling takes place every 5 seconds by default and the work is able to run multiple
# threads to look for and process this work.
#
# Scheduled Tasks allow for code to be executed on a ROUGH schedule. This is used for administrative tasks. A single
# thread will run within each worker process and attempt to acquire the 'tasks' role. If successful it will run all
# tasks which are due to be run. The tasks are then scheduled to run again at a future time. Workers which are not
# successful in acquiring the role will not run any tasks but will still attempt to acquire a lock in case the current
# acquiree disappears.
#
# The worker process will run until it receives a TERM or INT signal. It will then attempt to gracefully shut down
# after it has completed any outstanding jobs which are already inflight.
class Process
# An array of job classes that should be processed each time the worker ticks.
#
# @return [Array<Class>]
JOBS = [
Jobs::ProcessQueuedMessagesJob,
Jobs::ProcessWebhookRequestsJob
].freeze
# An array of tasks that should be processed
#
# @return [Array<Class>]
TASKS = [
ActionDeletionsScheduledTask,
CheckAllDNSScheduledTask,
CleanupAuthieSessionsScheduledTask,
ExpireHeldMessagesScheduledTask,
ProcessMessageRetentionScheduledTask,
PruneSuppressionListsScheduledTask,
PruneWebhookRequestsScheduledTask,
SendNotificationsScheduledTask
].freeze
# @param [Integer] thread_count The number of worker threads to run in this process
def initialize(thread_count: 2, work_sleep_time: 5, task_sleep_time: 60)
@thread_count = thread_count
@exit_pipe_read, @exit_pipe_write = IO.pipe
@work_sleep_time = work_sleep_time
@task_sleep_time = task_sleep_time
@threads = []
end
def run
logger.tagged(component: "worker") do
setup_traps
start_work_threads
start_tasks_thread
wait_for_threads
end
end
private
# Install signal traps to allow for graceful shutdown
#
# @return [void]
def setup_traps
trap("INT") { receive_signal("INT") }
trap("TERM") { receive_signal("TERM") }
end
# Receive a signal and set the shutdown flag
#
# @param [String] signal The signal that was received z
# @return [void]
def receive_signal(signal)
puts "Received #{signal} signal. Stopping when able."
@shutdown = true
@exit_pipe_write.close
end
# Wait for the period of time and return true or false if shutdown has been requested. If the shutdown is
# requested during the wait, it will return immediately otherwise it will return false when it has finished
# waiting for the period of time.
#
# @param [Integer] wait_time The time to wait for
# @return [Boolean]
def shutdown_after_wait?(wait_time)
@exit_pipe_read.wait_readable(wait_time) ? true : false
end
# Wait for all threads to complete
#
# @return [void]
def wait_for_threads
@threads.each(&:join)
end
# Start the worker threads
#
# @return [void]
def start_work_threads
logger.info "starting #{@thread_count} work threads"
@thread_count.times do |index|
start_work_thread(index)
end
end
# Start a worker thread
#
# @return [void]
def start_work_thread(index)
@threads << Thread.new do
logger.tagged(component: "worker", thread: "work#{index}") do
logger.info "started work thread #{index}"
loop do
work_completed = work
if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time)
break
end
end
logger.info "stopping work thread #{index}"
end
end
end
# Actually perform the work for this tick. This will call each job which has been registered.
#
# @return [Boolean] Whether any work was completed in this job or not
def work
completed_work = 0
ActiveRecord::Base.connection_pool.with_connection do
JOBS.each do |job_class|
capture_errors do
job = job_class.new(logger: logger)
job.call
completed_work += 1 if job.work_completed?
end
end
end
completed_work.positive?
end
# Start the tasks thread
#
# @return [void]
def start_tasks_thread
logger.info "starting tasks thread"
@threads << Thread.new do
logger.tagged(component: "worker", thread: "tasks") do
loop do
run_tasks
if shutdown_after_wait?(@task_sleep_time)
break
end
end
logger.info "stopping tasks thread"
ActiveRecord::Base.connection_pool.with_connection do
if WorkerRole.release(:tasks)
logger.info "releasesd tasks role"
end
end
end
end
end
# Run the tasks. This will attempt to acquire the tasks role and if successful it will all the registered
# tasks if they are due to be run.
#
# @return [void]
def run_tasks
role_acquisition_status = ActiveRecord::Base.connection_pool.with_connection do
WorkerRole.acquire(:tasks)
end
case role_acquisition_status
when :stolen
logger.info "acquired task role by stealing it from a lazy worker"
when :created
logger.info "acquired task role by creating it"
when :renewed
logger.debug "acquired task role by renewing it"
else
logger.debug "could not acquire task role, not doing anything"
return false
end
ActiveRecord::Base.connection_pool.with_connection do
TASKS.each { |task| run_task(task) }
end
end
# Run a single task
#
# @param [Class] task The task to run
# @return [void]
def run_task(task)
logger.tagged task: task do
scheduled_task = ScheduledTask.find_by(name: task.to_s)
if scheduled_task.nil?
logger.info "no existing task object, creating it now"
scheduled_task = ScheduledTask.create!(name: task.to_s, next_run_after: task.next_run_after)
end
next unless scheduled_task.next_run_after < Time.current
logger.info "running task"
capture_errors { task.new(logger: logger).call }
next_run_after = task.next_run_after
logger.info "scheduling task to next run at #{next_run_after}"
scheduled_task.update!(next_run_after: next_run_after)
end
end
# Return the logger
#
# @return [Klogger::Logger]
def logger
Postal.logger
end
# Capture exceptions and handle this as appropriate.
#
# @yield The block of code to run
# @return [void]
def capture_errors
yield
rescue StandardError => e
logger.error "#{e.class} (#{e.message})"
e.backtrace.each { |line| logger.error line }
Sentry.capture_exception(e) if defined?(Sentry)
end
end
end