مراية لـ
https://github.com/postalserver/postal.git
تم المزامنة 2025-12-01 05:43:04 +00:00
feat: new background work process
This removes all previous dependencies on RabbitMQ and the need to run separate cron and requeueing processes.
هذا الالتزام موجود في:
@@ -85,6 +85,9 @@ module Postal
|
||||
end
|
||||
end
|
||||
|
||||
# Return a generic logger for use generally throughout Postal.
|
||||
#
|
||||
# @return [Klogger::Logger] A logger instance
|
||||
def self.logger
|
||||
@logger ||= begin
|
||||
k = Klogger.new(nil, destination: Rails.env.test? ? "/dev/null" : $stdout, highlight: Rails.env.development?)
|
||||
@@ -106,9 +109,14 @@ module Postal
|
||||
def self.locker_name
|
||||
string = process_name.dup
|
||||
string += " job:#{Thread.current[:job_id]}" if Thread.current[:job_id]
|
||||
string += " thread:#{Thread.current.native_thread_id}"
|
||||
string
|
||||
end
|
||||
|
||||
def self.locker_name_with_suffix(suffix)
|
||||
"#{locker_name} #{suffix}"
|
||||
end
|
||||
|
||||
def self.smtp_from_name
|
||||
config.smtp&.from_name || "Postal"
|
||||
end
|
||||
@@ -175,7 +183,7 @@ module Postal
|
||||
end
|
||||
|
||||
def self.graylog_logging_destination
|
||||
@graylog_destination ||= begin
|
||||
@graylog_logging_destination ||= begin
|
||||
notifier = GELF::Notifier.new(config.logging.graylog.host, config.logging.graylog.port, "WAN")
|
||||
proc do |_logger, payload, group_ids|
|
||||
short_message = payload.delete(:message) || "[message missing]"
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "nifty/utils/random_string"
|
||||
|
||||
module Postal
|
||||
class Job
|
||||
|
||||
def initialize(id, params = {})
|
||||
@id = id
|
||||
@params = params
|
||||
on_initialize
|
||||
end
|
||||
|
||||
attr_reader :id
|
||||
|
||||
def params
|
||||
@params || {}
|
||||
end
|
||||
|
||||
def on_initialize
|
||||
# Called whenever the class is initialized. Can be overriden.
|
||||
end
|
||||
|
||||
def on_error(exception)
|
||||
# Called if there's an exception while processing the perform block.
|
||||
# Receives the exception.
|
||||
end
|
||||
|
||||
def perform
|
||||
end
|
||||
|
||||
def log(text)
|
||||
Worker.logger.info(text)
|
||||
end
|
||||
|
||||
def self.queue(queue, params = {})
|
||||
job_id = Nifty::Utils::RandomString.generate(length: 10).upcase
|
||||
job_payload = { "params" => params, "class_name" => name, "id" => job_id, "queue" => queue }
|
||||
Postal::Worker.job_queue(queue).publish(job_payload.to_json, persistent: false)
|
||||
job_id
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -445,7 +445,11 @@ module Postal
|
||||
#
|
||||
def bounce!(bounce_message)
|
||||
create_delivery("Bounced", details: "We've received a bounce message for this e-mail. See <msg:#{bounce_message.id}> for details.")
|
||||
SendWebhookJob.queue(:main, server_id: database.server_id, event: "MessageBounced", payload: { _original_message: id, _bounce: bounce_message.id })
|
||||
|
||||
WebhookRequest.trigger(server, "MessageBounced", {
|
||||
original_message: webhook_hash,
|
||||
bounce: bounce_message.webhook_hash
|
||||
})
|
||||
end
|
||||
|
||||
#
|
||||
@@ -461,7 +465,12 @@ module Postal
|
||||
def create_load(request)
|
||||
update("loaded" => Time.now.to_f) if loaded.nil?
|
||||
database.insert(:loads, { message_id: id, ip_address: request.ip, user_agent: request.user_agent, timestamp: Time.now.to_f })
|
||||
SendWebhookJob.queue(:main, server_id: database.server_id, event: "MessageLoaded", payload: { _message: id, ip_address: request.ip, user_agent: request.user_agent })
|
||||
|
||||
WebhookRequest.trigger(server, "MessageLoaded", {
|
||||
message: webhook_hash,
|
||||
ip_address: request.ip,
|
||||
user_agent: request.user_agent
|
||||
})
|
||||
end
|
||||
|
||||
#
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Postal
|
||||
class MessageRequeuer
|
||||
|
||||
def run
|
||||
Signal.trap("INT") { @running ? @exit = true : Process.exit(0) }
|
||||
Signal.trap("TERM") { @running ? @exit = true : Process.exit(0) }
|
||||
|
||||
log "Running message requeuer..."
|
||||
loop do
|
||||
@running = true
|
||||
QueuedMessage.requeue_all
|
||||
@running = false
|
||||
check_exit
|
||||
sleep 5
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def log(text)
|
||||
Postal.logger.info text, component: "message-requeuer"
|
||||
end
|
||||
|
||||
def check_exit
|
||||
return unless @exit
|
||||
|
||||
log "Exiting"
|
||||
Process.exit(0)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,38 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "postal/config"
|
||||
require "bunny"
|
||||
|
||||
module Postal
|
||||
module RabbitMQ
|
||||
|
||||
def self.create_connection
|
||||
bunny_host = ["localhost"]
|
||||
|
||||
if Postal.config.rabbitmq&.host.is_a?(Array)
|
||||
bunny_host = Postal.config.rabbitmq&.host
|
||||
elsif Postal.config.rabbitmq&.host.is_a?(String)
|
||||
bunny_host = [Postal.config.rabbitmq&.host]
|
||||
end
|
||||
|
||||
conn = Bunny.new(
|
||||
hosts: bunny_host,
|
||||
port: Postal.config.rabbitmq&.port || 5672,
|
||||
tls: Postal.config.rabbitmq&.tls || false,
|
||||
verify_peer: Postal.config.rabbitmq&.verify_peer || true,
|
||||
tls_ca_certificates: Postal.config.rabbitmq&.tls_ca_certificates || ["/etc/ssl/certs/ca-certificates.crt"],
|
||||
username: Postal.config.rabbitmq&.username || "guest",
|
||||
password: Postal.config.rabbitmq&.password || "guest",
|
||||
vhost: Postal.config.rabbitmq&.vhost || nil
|
||||
)
|
||||
conn.start
|
||||
conn
|
||||
end
|
||||
|
||||
def self.create_channel
|
||||
conn = create_connection
|
||||
conn.create_channel(nil, Postal.config.workers.threads)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -94,16 +94,20 @@ module Postal
|
||||
user_agent: request.user_agent,
|
||||
timestamp: time
|
||||
})
|
||||
SendWebhookJob.queue(:main,
|
||||
server_id: message_db.server_id,
|
||||
event: "MessageLinkClicked",
|
||||
payload: {
|
||||
_message: link["message_id"],
|
||||
url: link["url"],
|
||||
token: link["token"],
|
||||
ip_address: request.ip,
|
||||
user_agent: request.user_agent
|
||||
})
|
||||
|
||||
begin
|
||||
message_webhook_hash = message_db.message(link["message_id"]).webhook_hash
|
||||
WebhookRequest.trigger(message_db.server, "MessageLinkClicked", {
|
||||
message: message_webhook_hash,
|
||||
url: link["url"],
|
||||
token: link["token"],
|
||||
ip_address: request.ip,
|
||||
user_agent: request.user_agent
|
||||
})
|
||||
rescue Postal::MessageDB::Message::NotFound
|
||||
# If we can't find the message that this link is associated with, we'll just ignore it
|
||||
# and not trigger any webhooks.
|
||||
end
|
||||
end
|
||||
|
||||
[307, { "Location" => link["url"] }, ["Redirected to: #{link['url']}"]]
|
||||
|
||||
@@ -1,220 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Postal
|
||||
class Worker
|
||||
|
||||
def initialize(queues)
|
||||
@initial_queues = queues
|
||||
@active_queues = {}
|
||||
@process_name = $0
|
||||
@running_jobs = []
|
||||
end
|
||||
|
||||
def work
|
||||
logger.info "Worker running with #{Postal.config.workers.threads} threads"
|
||||
|
||||
Signal.trap("INT") do
|
||||
@exit = true
|
||||
set_process_name
|
||||
end
|
||||
Signal.trap("TERM") do
|
||||
@exit = true
|
||||
set_process_name
|
||||
end
|
||||
|
||||
self.class.job_channel.prefetch(Postal.config.workers.threads)
|
||||
@initial_queues.each { |queue| join_queue(queue) }
|
||||
|
||||
exit_checks = 0
|
||||
loop do
|
||||
if @exit && @running_jobs.empty?
|
||||
logger.info "Exiting immediately because no jobs running"
|
||||
exit 0
|
||||
elsif @exit
|
||||
if exit_checks >= 60
|
||||
logger.info "Job did not finish in a timely manner. Exiting"
|
||||
exit 0
|
||||
end
|
||||
if exit_checks.zero?
|
||||
logger.info "Exit requested but job is running. Waiting for job to finish."
|
||||
end
|
||||
sleep 60
|
||||
exit_checks += 1
|
||||
else
|
||||
manage_ip_queues
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def receive_job(delivery_info, properties, message)
|
||||
if message && message["class_name"]
|
||||
@running_jobs << message["id"]
|
||||
set_process_name
|
||||
start_time = Time.now
|
||||
Thread.current[:job_id] = message["id"]
|
||||
logger.info "Processing job"
|
||||
begin
|
||||
klass = message["class_name"].constantize.new(message["id"], message["params"])
|
||||
klass.perform
|
||||
GC.start
|
||||
rescue StandardError => e
|
||||
klass.on_error(e) if defined?(klass)
|
||||
logger.exception(e)
|
||||
if defined?(Sentry)
|
||||
Sentry.capture_exception(e, extra: { job_id: message["id"] })
|
||||
end
|
||||
ensure
|
||||
logger.info "Finished job", time: (Time.now - start_time).to_i
|
||||
end
|
||||
end
|
||||
ensure
|
||||
Thread.current[:job_id] = nil
|
||||
self.class.job_channel.ack(delivery_info.delivery_tag)
|
||||
@running_jobs.delete(message["id"]) if message["id"]
|
||||
set_process_name
|
||||
|
||||
if @exit && @running_jobs.empty?
|
||||
logger.info "Exiting because all jobs have finished."
|
||||
exit 0
|
||||
end
|
||||
end
|
||||
|
||||
def join_queue(queue)
|
||||
if @active_queues[queue]
|
||||
logger.error "attempted to join queue but already joined", queue: queue
|
||||
else
|
||||
consumer = self.class.job_queue(queue).subscribe(manual_ack: true) do |delivery_info, properties, body|
|
||||
message = begin
|
||||
JSON.parse(body)
|
||||
rescue StandardError
|
||||
nil
|
||||
end
|
||||
|
||||
logger.tagged(job_id: message["id"], queue: queue, job_class: message["class_name"]) do
|
||||
receive_job(delivery_info, properties, message)
|
||||
end
|
||||
end
|
||||
@active_queues[queue] = consumer
|
||||
logger.info "joined queue", queue: queue
|
||||
end
|
||||
end
|
||||
|
||||
def leave_queue(queue)
|
||||
if consumer = @active_queues[queue]
|
||||
consumer.cancel
|
||||
@active_queues.delete(queue)
|
||||
logger.info "left queue", queue: queue
|
||||
else
|
||||
logger.error "requested to leave queue, but not joined", queue: queue
|
||||
end
|
||||
end
|
||||
|
||||
def manage_ip_queues
|
||||
@ip_queues ||= []
|
||||
@ip_to_id_mapping ||= {}
|
||||
@unassigned_ips ||= []
|
||||
@pairs ||= {}
|
||||
@counter ||= 0
|
||||
|
||||
if @counter >= 15
|
||||
@ip_to_id_mapping = {}
|
||||
@unassigned_ips = []
|
||||
@counter = 0
|
||||
else
|
||||
@counter += 1
|
||||
end
|
||||
|
||||
# Get all IP addresses on the system
|
||||
current_ip_addresses = Socket.ip_address_list.map(&:ip_address)
|
||||
|
||||
# Map them to an actual ID in the database if we can and cache that
|
||||
needed_ip_ids = []
|
||||
current_ip_addresses.each do |ip|
|
||||
need = nil
|
||||
if id = @ip_to_id_mapping[ip]
|
||||
# We know this IPs ID, we'll just use that.
|
||||
need = id
|
||||
elsif @unassigned_ips.include?(ip)
|
||||
# We know this IP isn't valid. We don't need to do anything
|
||||
elsif !self.class.local_ip?(ip) && ip_address = IPAddress.where("ipv4 = ? OR ipv6 = ?", ip, ip).first
|
||||
# We need to look this up
|
||||
@pairs[ip_address.ipv4] = ip_address.ipv6
|
||||
@ip_to_id_mapping[ip] = ip_address.id
|
||||
need = ip_address.id
|
||||
else
|
||||
@unassigned_ips << ip
|
||||
end
|
||||
|
||||
next unless need
|
||||
|
||||
pair = @pairs[ip] || @pairs.key(ip)
|
||||
if pair.nil? || current_ip_addresses.include?(pair)
|
||||
needed_ip_ids << @ip_to_id_mapping[ip]
|
||||
else
|
||||
logger.info "Host has '#{ip}' but its pair (#{pair}) isn't here. Cannot add now."
|
||||
end
|
||||
end
|
||||
|
||||
# Make an array of needed queue names
|
||||
# Work out what we need to actually do here
|
||||
missing_queues = needed_ip_ids - @ip_queues
|
||||
unwanted_queues = @ip_queues - needed_ip_ids
|
||||
# Leave the queues we don't want any more
|
||||
unwanted_queues.each do |id|
|
||||
leave_queue("outgoing-#{id}")
|
||||
@ip_queues.delete(id)
|
||||
ip_addresses_to_clear = []
|
||||
@ip_to_id_mapping.each do |iip, iid|
|
||||
if id == iid
|
||||
ip_addresses_to_clear << iip
|
||||
end
|
||||
end
|
||||
ip_addresses_to_clear.each { |ip| @ip_to_id_mapping.delete(ip) }
|
||||
end
|
||||
# Join any missing queues
|
||||
missing_queues.uniq.each do |id|
|
||||
join_queue("outgoing-#{id}")
|
||||
@ip_queues << id
|
||||
end
|
||||
end
|
||||
|
||||
def set_process_name
|
||||
prefix = @process_name.to_s
|
||||
prefix += " [exiting]" if @exit
|
||||
if @running_jobs.empty?
|
||||
$0 = "#{prefix} (idle)"
|
||||
else
|
||||
$0 = "#{prefix} (running #{@running_jobs.join(', ')})"
|
||||
end
|
||||
end
|
||||
|
||||
def logger
|
||||
self.class.logger
|
||||
end
|
||||
|
||||
class << self
|
||||
|
||||
def logger
|
||||
Postal.logger
|
||||
end
|
||||
|
||||
def job_channel
|
||||
@job_channel ||= Postal::RabbitMQ.create_channel
|
||||
end
|
||||
|
||||
def job_queue(name)
|
||||
@job_queues ||= {}
|
||||
@job_queues[name] ||= job_channel.queue("deliver-jobs-#{name}", durable: true, arguments: { "x-message-ttl" => 60_000 })
|
||||
end
|
||||
|
||||
def local_ip?(ip)
|
||||
!!(ip =~ /\A(127\.|fe80:|::)/)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,27 +1,6 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
namespace :postal do
|
||||
desc "Start the cron worker"
|
||||
task cron: :environment do
|
||||
require "clockwork"
|
||||
require Rails.root.join("config", "cron")
|
||||
trap("TERM") do
|
||||
puts "Exiting..."
|
||||
Process.exit(0)
|
||||
end
|
||||
Clockwork.run
|
||||
end
|
||||
|
||||
desc "Start SMTP Server"
|
||||
task smtp_server: :environment do
|
||||
Postal::SMTPServer::Server.new(debug: true).run
|
||||
end
|
||||
|
||||
desc "Start the message requeuer"
|
||||
task requeuer: :environment do
|
||||
Postal::MessageRequeuer.new.run
|
||||
end
|
||||
|
||||
desc "Run all migrations on message databases"
|
||||
task migrate_message_databases: :environment do
|
||||
Server.all.each do |server|
|
||||
|
||||
29
lib/worker/jobs/base_job.rb
Normal file
29
lib/worker/jobs/base_job.rb
Normal file
@@ -0,0 +1,29 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Worker
|
||||
module Jobs
|
||||
class BaseJob
|
||||
|
||||
def initialize(logger:)
|
||||
@logger = logger
|
||||
end
|
||||
|
||||
def call
|
||||
# Override me.
|
||||
end
|
||||
|
||||
def work_completed?
|
||||
@work_completed == true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def work_completed!
|
||||
@work_completed = true
|
||||
end
|
||||
|
||||
attr_reader :logger
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
73
lib/worker/jobs/process_queued_messages_job.rb
Normal file
73
lib/worker/jobs/process_queued_messages_job.rb
Normal file
@@ -0,0 +1,73 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Worker
|
||||
module Jobs
|
||||
class ProcessQueuedMessagesJob < BaseJob
|
||||
|
||||
def call
|
||||
@lock_time = Time.current
|
||||
@locker = Postal.locker_name_with_suffix(SecureRandom.hex(8))
|
||||
|
||||
find_ip_addresses
|
||||
lock_message_for_processing
|
||||
obtain_locked_messages
|
||||
process_messages
|
||||
@messages_to_process
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Returns an array of IP address IDs that are present on the host that is
|
||||
# running this job.
|
||||
#
|
||||
# @return [Array<Integer>]
|
||||
def find_ip_addresses
|
||||
ip_addresses = { 4 => [], 6 => [] }
|
||||
Socket.ip_address_list.each do |address|
|
||||
next if local_ip?(address.ip_address)
|
||||
|
||||
ip_addresses[address.ipv4? ? 4 : 6] << address.ip_address
|
||||
end
|
||||
@ip_addresses = IPAddress.where(ipv4: ip_addresses[4]).or(IPAddress.where(ipv6: ip_addresses[6])).pluck(:id)
|
||||
end
|
||||
|
||||
# Is the given IP address a local address?
|
||||
#
|
||||
# @param [String] ip
|
||||
# @return [Boolean]
|
||||
def local_ip?(ip)
|
||||
!!(ip =~ /\A(127\.|fe80:|::)/)
|
||||
end
|
||||
|
||||
# Obtain a queued message from the database for processing
|
||||
#
|
||||
# @return [void]
|
||||
def lock_message_for_processing
|
||||
QueuedMessage.where(ip_address_id: [nil, @ip_addresses])
|
||||
.where(locked_by: nil, locked_at: nil)
|
||||
.ready_with_delayed_retry
|
||||
.limit(1)
|
||||
.update_all(locked_by: @locker, locked_at: @lock_time)
|
||||
end
|
||||
|
||||
# Get a full list of all messages which we can process (i.e. those which have just
|
||||
# been locked by us for processing)
|
||||
#
|
||||
# @return [void]
|
||||
def obtain_locked_messages
|
||||
@messages_to_process = QueuedMessage.where(locked_by: @locker, locked_at: @lock_time)
|
||||
end
|
||||
|
||||
# Process the messages we obtained from the database
|
||||
#
|
||||
# @return [void]
|
||||
def process_messages
|
||||
@messages_to_process.each do |message|
|
||||
work_completed!
|
||||
UnqueueMessageService.new(queued_message: message, logger: logger).call
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
48
lib/worker/jobs/process_webhook_requests_job.rb
Normal file
48
lib/worker/jobs/process_webhook_requests_job.rb
Normal file
@@ -0,0 +1,48 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Worker
|
||||
module Jobs
|
||||
class ProcessWebhookRequestsJob < BaseJob
|
||||
|
||||
def call
|
||||
@lock_time = Time.current
|
||||
@locker = Postal.locker_name_with_suffix(SecureRandom.hex(8))
|
||||
|
||||
lock_request_for_processing
|
||||
obtain_locked_requests
|
||||
process_requests
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Obtain a webhook request from the database for processing
|
||||
#
|
||||
# @return [void]
|
||||
def lock_request_for_processing
|
||||
WebhookRequest.unlocked
|
||||
.ready
|
||||
.limit(1)
|
||||
.update_all(locked_by: @locker, locked_at: @lock_time)
|
||||
end
|
||||
|
||||
# Get a full list of all webhooks which we can process (i.e. those which have just
|
||||
# been locked by us for processing)
|
||||
#
|
||||
# @return [void]
|
||||
def obtain_locked_requests
|
||||
@requests_to_process = WebhookRequest.where(locked_by: @locker, locked_at: @lock_time)
|
||||
end
|
||||
|
||||
# Process the webhook requests we obtained from the database
|
||||
#
|
||||
# @return [void]
|
||||
def process_requests
|
||||
@requests_to_process.each do |request|
|
||||
work_completed!
|
||||
request.deliver
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
242
lib/worker/process.rb
Normal file
242
lib/worker/process.rb
Normal file
@@ -0,0 +1,242 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Worker
|
||||
# The Postal Worker process is responsible for handling all background tasks. This includes processing of all
|
||||
# messages, webhooks and other administrative tasks. There are two main types of background work which is completed,
|
||||
# jobs and scheduled tasks.
|
||||
#
|
||||
# The 'Jobs' here allow for the continuous monitoring of a database table (or queue) and processing of any new items
|
||||
# which may appear in that. The polling takes place every 5 seconds by default and the work is able to run multiple
|
||||
# threads to look for and process this work.
|
||||
#
|
||||
# Scheduled Tasks allow for code to be executed on a ROUGH schedule. This is used for administrative tasks. A single
|
||||
# thread will run within each worker process and attempt to acquire the 'tasks' role. If successful it will run all
|
||||
# tasks which are due to be run. The tasks are then scheduled to run again at a future time. Workers which are not
|
||||
# successful in acquiring the role will not run any tasks but will still attempt to acquire a lock in case the current
|
||||
# acquiree disappears.
|
||||
#
|
||||
# The worker process will run until it receives a TERM or INT signal. It will then attempt to gracefully shut down
|
||||
# after it has completed any outstanding jobs which are already inflight.
|
||||
class Process
|
||||
|
||||
# An array of job classes that should be processed each time the worker ticks.
|
||||
#
|
||||
# @return [Array<Class>]
|
||||
JOBS = [
|
||||
Jobs::ProcessQueuedMessagesJob,
|
||||
Jobs::ProcessWebhookRequestsJob
|
||||
].freeze
|
||||
|
||||
# An array of tasks that should be processed
|
||||
#
|
||||
# @return [Array<Class>]
|
||||
TASKS = [
|
||||
ActionDeletionsScheduledTask,
|
||||
CheckAllDNSScheduledTask,
|
||||
CleanupAuthieSessionsScheduledTask,
|
||||
ExpireHeldMessagesScheduledTask,
|
||||
ProcessMessageRetentionScheduledTask,
|
||||
PruneSuppressionListsScheduledTask,
|
||||
PruneWebhookRequestsScheduledTask,
|
||||
SendNotificationsScheduledTask
|
||||
].freeze
|
||||
|
||||
# @param [Integer] thread_count The number of worker threads to run in this process
|
||||
def initialize(thread_count: 2, work_sleep_time: 5, task_sleep_time: 60)
|
||||
@thread_count = thread_count
|
||||
@exit_pipe_read, @exit_pipe_write = IO.pipe
|
||||
@work_sleep_time = work_sleep_time
|
||||
@task_sleep_time = task_sleep_time
|
||||
@threads = []
|
||||
end
|
||||
|
||||
def run
|
||||
logger.tagged(component: "worker") do
|
||||
setup_traps
|
||||
start_work_threads
|
||||
start_tasks_thread
|
||||
wait_for_threads
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Install signal traps to allow for graceful shutdown
|
||||
#
|
||||
# @return [void]
|
||||
def setup_traps
|
||||
trap("INT") { receive_signal("INT") }
|
||||
trap("TERM") { receive_signal("TERM") }
|
||||
end
|
||||
|
||||
# Receive a signal and set the shutdown flag
|
||||
#
|
||||
# @param [String] signal The signal that was received z
|
||||
# @return [void]
|
||||
def receive_signal(signal)
|
||||
puts "Received #{signal} signal. Stopping when able."
|
||||
@shutdown = true
|
||||
@exit_pipe_write.close
|
||||
end
|
||||
|
||||
# Wait for the period of time and return true or false if shutdown has been requested. If the shutdown is
|
||||
# requested during the wait, it will return immediately otherwise it will return false when it has finished
|
||||
# waiting for the period of time.
|
||||
#
|
||||
# @param [Integer] wait_time The time to wait for
|
||||
# @return [Boolean]
|
||||
def shutdown_after_wait?(wait_time)
|
||||
@exit_pipe_read.wait_readable(wait_time) ? true : false
|
||||
end
|
||||
|
||||
# Wait for all threads to complete
|
||||
#
|
||||
# @return [void]
|
||||
def wait_for_threads
|
||||
@threads.each(&:join)
|
||||
end
|
||||
|
||||
# Start the worker threads
|
||||
#
|
||||
# @return [void]
|
||||
def start_work_threads
|
||||
logger.info "starting #{@thread_count} work threads"
|
||||
@thread_count.times do |index|
|
||||
start_work_thread(index)
|
||||
end
|
||||
end
|
||||
|
||||
# Start a worker thread
|
||||
#
|
||||
# @return [void]
|
||||
def start_work_thread(index)
|
||||
@threads << Thread.new do
|
||||
logger.tagged(component: "worker", thread: "work#{index}") do
|
||||
logger.info "started work thread #{index}"
|
||||
loop do
|
||||
work_completed = work
|
||||
|
||||
if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
logger.info "stopping work thread #{index}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Actually perform the work for this tick. This will call each job which has been registered.
|
||||
#
|
||||
# @return [Boolean] Whether any work was completed in this job or not
|
||||
def work
|
||||
completed_work = 0
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
JOBS.each do |job_class|
|
||||
capture_errors do
|
||||
job = job_class.new(logger: logger)
|
||||
job.call
|
||||
|
||||
completed_work += 1 if job.work_completed?
|
||||
end
|
||||
end
|
||||
end
|
||||
completed_work.positive?
|
||||
end
|
||||
|
||||
# Start the tasks thread
|
||||
#
|
||||
# @return [void]
|
||||
def start_tasks_thread
|
||||
logger.info "starting tasks thread"
|
||||
@threads << Thread.new do
|
||||
logger.tagged(component: "worker", thread: "tasks") do
|
||||
loop do
|
||||
run_tasks
|
||||
|
||||
if shutdown_after_wait?(@task_sleep_time)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
logger.info "stopping tasks thread"
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
if WorkerRole.release(:tasks)
|
||||
logger.info "releasesd tasks role"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Run the tasks. This will attempt to acquire the tasks role and if successful it will all the registered
|
||||
# tasks if they are due to be run.
|
||||
#
|
||||
# @return [void]
|
||||
def run_tasks
|
||||
role_acquisition_status = ActiveRecord::Base.connection_pool.with_connection do
|
||||
WorkerRole.acquire(:tasks)
|
||||
end
|
||||
|
||||
case role_acquisition_status
|
||||
when :stolen
|
||||
logger.info "acquired task role by stealing it from a lazy worker"
|
||||
when :created
|
||||
logger.info "acquired task role by creating it"
|
||||
when :renewed
|
||||
logger.debug "acquired task role by renewing it"
|
||||
else
|
||||
logger.debug "could not acquire task role, not doing anything"
|
||||
return false
|
||||
end
|
||||
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
TASKS.each { |task| run_task(task) }
|
||||
end
|
||||
end
|
||||
|
||||
# Run a single task
|
||||
#
|
||||
# @param [Class] task The task to run
|
||||
# @return [void]
|
||||
def run_task(task)
|
||||
logger.tagged task: task do
|
||||
scheduled_task = ScheduledTask.find_by(name: task.to_s)
|
||||
if scheduled_task.nil?
|
||||
logger.info "no existing task object, creating it now"
|
||||
scheduled_task = ScheduledTask.create!(name: task.to_s, next_run_after: task.next_run_after)
|
||||
end
|
||||
|
||||
next unless scheduled_task.next_run_after < Time.current
|
||||
|
||||
logger.info "running task"
|
||||
|
||||
capture_errors { task.new(logger: logger).call }
|
||||
|
||||
next_run_after = task.next_run_after
|
||||
logger.info "scheduling task to next run at #{next_run_after}"
|
||||
scheduled_task.update!(next_run_after: next_run_after)
|
||||
end
|
||||
end
|
||||
|
||||
# Return the logger
|
||||
#
|
||||
# @return [Klogger::Logger]
|
||||
def logger
|
||||
Postal.logger
|
||||
end
|
||||
|
||||
# Capture exceptions and handle this as appropriate.
|
||||
#
|
||||
# @yield The block of code to run
|
||||
# @return [void]
|
||||
def capture_errors
|
||||
yield
|
||||
rescue StandardError => e
|
||||
logger.error "#{e.class} (#{e.message})"
|
||||
e.backtrace.each { |line| logger.error line }
|
||||
Sentry.capture_exception(e) if defined?(Sentry)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
المرجع في مشكلة جديدة
حظر مستخدم