diff --git a/config/postal.defaults.yml b/config/postal.defaults.yml index deef0a3..d863725 100644 --- a/config/postal.defaults.yml +++ b/config/postal.defaults.yml @@ -57,6 +57,7 @@ rabbitmq: workers: quantity: 1 + threads: 4 smtp_server: port: 25 diff --git a/lib/postal/rabbit_mq.rb b/lib/postal/rabbit_mq.rb index fe06747..43df34c 100644 --- a/lib/postal/rabbit_mq.rb +++ b/lib/postal/rabbit_mq.rb @@ -18,7 +18,7 @@ module Postal def self.create_channel conn = self.create_connection - conn.create_channel + conn.create_channel(nil, Postal.config.workers.threads) end end diff --git a/lib/postal/worker.rb b/lib/postal/worker.rb index bb48590..f1248d7 100644 --- a/lib/postal/worker.rb +++ b/lib/postal/worker.rb @@ -5,20 +5,22 @@ module Postal @initial_queues = queues @active_queues = {} @process_name = $0 + @running_jobs = [] end def work - @running_job = false - Signal.trap("INT") { @exit = true } - Signal.trap("TERM") { @exit = true } + logger.info "Worker running with #{Postal.config.workers.threads} threads" - self.class.job_channel.prefetch(1) + Signal.trap("INT") { @exit = true; set_process_name } + Signal.trap("TERM") { @exit = true; set_process_name } + + self.class.job_channel.prefetch(Postal.config.workers.threads) @initial_queues.each { |queue | join_queue(queue) } exit_checks = 0 loop do - if @exit && @running_job == false - logger.info "Exiting immediately because no job running" + if @exit && @running_jobs.empty? + logger.info "Exiting immediately because no jobs running" exit 0 elsif @exit if exit_checks >= 60 @@ -40,12 +42,12 @@ module Postal private def receive_job(delivery_info, properties, body) - @running_job = true begin message = JSON.parse(body) rescue nil if message && message['class_name'] + @running_jobs << message['id'] + set_process_name start_time = Time.now - $0 = "#{@process_name} (running #{message['class_name']})" Thread.current[:job_id] = message['id'] logger.info "[#{message['id']}] Started processing \e[34m#{message['class_name']}\e[0m job" begin @@ -65,12 +67,12 @@ module Postal end ensure Thread.current[:job_id] = nil - $0 = @process_name self.class.job_channel.ack(delivery_info.delivery_tag) - @running_job = false + @running_jobs.delete(message['id']) if message['id'] + set_process_name - if @exit - logger.info "Exiting because a job has ended." + if @exit && @running_jobs.empty? + logger.info "Exiting because all jobs have finished." exit 0 end end @@ -169,6 +171,16 @@ module Postal end end + def set_process_name + prefix = @process_name.to_s + prefix += " [exiting]" if @exit + if @running_jobs.empty? + $0 = "#{prefix} (idle)" + else + $0 = "#{prefix} (running #{@running_jobs.join(', ')})" + end + end + def logger self.class.logger end