مراية لـ
https://github.com/postalserver/postal.git
تم المزامنة 2025-11-30 21:32:30 +00:00
multi-threaded workers
هذا الالتزام موجود في:
@@ -57,6 +57,7 @@ rabbitmq:
|
||||
|
||||
workers:
|
||||
quantity: 1
|
||||
threads: 4
|
||||
|
||||
smtp_server:
|
||||
port: 25
|
||||
|
||||
@@ -18,7 +18,7 @@ module Postal
|
||||
|
||||
def self.create_channel
|
||||
conn = self.create_connection
|
||||
conn.create_channel
|
||||
conn.create_channel(nil, Postal.config.workers.threads)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
@@ -5,20 +5,22 @@ module Postal
|
||||
@initial_queues = queues
|
||||
@active_queues = {}
|
||||
@process_name = $0
|
||||
@running_jobs = []
|
||||
end
|
||||
|
||||
def work
|
||||
@running_job = false
|
||||
Signal.trap("INT") { @exit = true }
|
||||
Signal.trap("TERM") { @exit = true }
|
||||
logger.info "Worker running with #{Postal.config.workers.threads} threads"
|
||||
|
||||
self.class.job_channel.prefetch(1)
|
||||
Signal.trap("INT") { @exit = true; set_process_name }
|
||||
Signal.trap("TERM") { @exit = true; set_process_name }
|
||||
|
||||
self.class.job_channel.prefetch(Postal.config.workers.threads)
|
||||
@initial_queues.each { |queue | join_queue(queue) }
|
||||
|
||||
exit_checks = 0
|
||||
loop do
|
||||
if @exit && @running_job == false
|
||||
logger.info "Exiting immediately because no job running"
|
||||
if @exit && @running_jobs.empty?
|
||||
logger.info "Exiting immediately because no jobs running"
|
||||
exit 0
|
||||
elsif @exit
|
||||
if exit_checks >= 60
|
||||
@@ -40,12 +42,12 @@ module Postal
|
||||
private
|
||||
|
||||
def receive_job(delivery_info, properties, body)
|
||||
@running_job = true
|
||||
begin
|
||||
message = JSON.parse(body) rescue nil
|
||||
if message && message['class_name']
|
||||
@running_jobs << message['id']
|
||||
set_process_name
|
||||
start_time = Time.now
|
||||
$0 = "#{@process_name} (running #{message['class_name']})"
|
||||
Thread.current[:job_id] = message['id']
|
||||
logger.info "[#{message['id']}] Started processing \e[34m#{message['class_name']}\e[0m job"
|
||||
begin
|
||||
@@ -65,12 +67,12 @@ module Postal
|
||||
end
|
||||
ensure
|
||||
Thread.current[:job_id] = nil
|
||||
$0 = @process_name
|
||||
self.class.job_channel.ack(delivery_info.delivery_tag)
|
||||
@running_job = false
|
||||
@running_jobs.delete(message['id']) if message['id']
|
||||
set_process_name
|
||||
|
||||
if @exit
|
||||
logger.info "Exiting because a job has ended."
|
||||
if @exit && @running_jobs.empty?
|
||||
logger.info "Exiting because all jobs have finished."
|
||||
exit 0
|
||||
end
|
||||
end
|
||||
@@ -169,6 +171,16 @@ module Postal
|
||||
end
|
||||
end
|
||||
|
||||
def set_process_name
|
||||
prefix = @process_name.to_s
|
||||
prefix += " [exiting]" if @exit
|
||||
if @running_jobs.empty?
|
||||
$0 = "#{prefix} (idle)"
|
||||
else
|
||||
$0 = "#{prefix} (running #{@running_jobs.join(', ')})"
|
||||
end
|
||||
end
|
||||
|
||||
def logger
|
||||
self.class.logger
|
||||
end
|
||||
|
||||
المرجع في مشكلة جديدة
حظر مستخدم