1
0
مراية لـ https://github.com/postalserver/postal.git تم المزامنة 2025-12-01 05:43:04 +00:00

feat: automatically remove queued messages with stale locks (#2872)

هذا الالتزام موجود في:
Adam Cooke
2024-03-12 11:37:29 +00:00
ملتزم من قبل GitHub
الأصل 5d8213a987
التزام d84152eb5d
14 ملفات معدلة مع 323 إضافات و29 حذوفات

عرض الملف

@@ -73,7 +73,7 @@ class ServersController < ApplicationController
end end
def queue def queue
@messages = @server.queued_messages.order(id: :desc).page(params[:page]) @messages = @server.queued_messages.order(id: :desc).page(params[:page]).includes(:ip_address)
@messages_with_message = @messages.include_message @messages_with_message = @messages.include_message
end end

عرض الملف

@@ -29,8 +29,8 @@ module MessageDequeuer
private private
def check_message_exists def check_message_exists
@queued_message.message return if @queued_message.message
rescue Postal::MessageDB::Message::NotFound
log "unqueue because backend message has been removed." log "unqueue because backend message has been removed."
remove_from_queue remove_from_queue
stop_processing stop_processing

عرض الملف

@@ -31,8 +31,8 @@ module MessageDequeuer
private private
def check_message_exists def check_message_exists
queued_message.message return if queued_message.message
rescue Postal::MessageDB::Message::NotFound
log "unqueueing because backend message has been removed" log "unqueueing because backend message has been removed"
remove_from_queue remove_from_queue
stop_processing stop_processing

عرض الملف

@@ -40,7 +40,8 @@ module Worker
ProcessMessageRetentionScheduledTask, ProcessMessageRetentionScheduledTask,
PruneSuppressionListsScheduledTask, PruneSuppressionListsScheduledTask,
PruneWebhookRequestsScheduledTask, PruneWebhookRequestsScheduledTask,
SendNotificationsScheduledTask SendNotificationsScheduledTask,
TidyQueuedMessagesTask
].freeze ].freeze
# @param [Integer] thread_count The number of worker threads to run in this process # @param [Integer] thread_count The number of worker threads to run in this process

عرض الملف

@@ -7,7 +7,11 @@ module HasMessage
end end
def message def message
@message ||= server.message_db.message(message_id) return @message if instance_variable_defined?("@message")
@message = server.message_db.message(message_id)
rescue Postal::MessageDB::Message::NotFound
@message = nil
end end
def message=(message) def message=(message)

عرض الملف

@@ -33,14 +33,14 @@ class QueuedMessage < ApplicationRecord
belongs_to :server belongs_to :server
belongs_to :ip_address, optional: true belongs_to :ip_address, optional: true
belongs_to :user, optional: true
before_create :allocate_ip_address before_create :allocate_ip_address
scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) } scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) }
scope :with_stale_lock, -> { where("locked_at IS NOT NULL AND locked_at < ?", Postal::Config.postal.queued_message_lock_stale_days.days.ago) }
def retry_now def retry_now
update(retry_after: nil) update!(retry_after: nil)
end end
def send_bounce def send_bounce
@@ -50,7 +50,11 @@ class QueuedMessage < ApplicationRecord
end end
def allocate_ip_address def allocate_ip_address
return unless Postal.ip_pools? && message && pool = server.ip_pool_for_message(message) return unless Postal.ip_pools?
return if message.nil?
pool = server.ip_pool_for_message(message)
return if pool.nil?
self.ip_address = pool.ip_addresses.select_by_priority self.ip_address = pool.ip_addresses.select_by_priority
end end

عرض الملف

@@ -0,0 +1,18 @@
# frozen_string_literal: true
class TidyQueuedMessagesTask < ApplicationScheduledTask
def call
QueuedMessage.with_stale_lock.in_batches do |messages|
messages.each do |message|
logger.info "removing queued message #{message.id} (locked at #{message.locked_at} by #{message.locked_by})"
message.destroy
end
end
end
def self.next_run_after
quarter_to_each_hour
end
end

عرض الملف

@@ -1,24 +1,50 @@
%ul.messageList %ul.messageList
- for message in messages - for message in messages
- if message.is_a?(QueuedMessage) - if message.is_a?(QueuedMessage)
- queued_message = message
- message = message.message - message = message.message
%li.messageList__message
= link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do
.messageList__details{:class => 'messageList__details--' + message.scope} - if message.nil? && queued_message
%p.messageList__subject= message.subject || "No subject" %li.messageList__message
%dl.messageList__addresses .messageList__link
%dt To .messageList__details
%dd %p.messageList__subject Deleted message ##{queued_message.message_id}
- if message.rcpt_to_return_path? %dl.messageList__addresses
%span.returnPathTag Return Path %dt Domain
- else %dd= queued_message.domain
= message.rcpt_to || "none" %dt Locked
%dt From %dd= queued_message.locked? ? "Yes" : "No"
%dd= message.mail_from || "none" .messageList__meta
%p.messageList__timestamp= queued_message.created_at.in_time_zone.to_fs(:long)
%p.messageList__status
%span.label{:class => "label--messageStatus-deleted"} Deleted
.messageList__meta
%p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long) - else
%p.messageList__status %li.messageList__message
- if message.read? = link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do
%span.label.label--purple Opened .messageList__details{:class => 'messageList__details--' + message.scope}
%span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize %p.messageList__subject= message.subject || "No subject"
%dl.messageList__addresses
%dt To
%dd
- if message.rcpt_to_return_path?
%span.returnPathTag Return Path
- else
= message.rcpt_to || "none"
%dt From
%dd= message.mail_from || "none"
- if queued_message
%dt Attempts
%dd= queued_message.attempts
%dt Retry after
%dd= queued_message.retry_after&.to_fs(:short) || "ASAP"
.messageList__meta
%p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long)
%p.messageList__status
- if message.read?
%span.label.label--purple Opened
%span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize

عرض الملف

@@ -18,6 +18,7 @@ This document contains all the environment variables which are available for thi
| `POSTAL_SIGNING_KEY_PATH` | String | Path to the private key used for signing | $config-file-root/signing.key | | `POSTAL_SIGNING_KEY_PATH` | String | Path to the private key used for signing | $config-file-root/signing.key |
| `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] | | `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] |
| `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] | | `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] |
| `POSTAL_QUEUED_MESSAGE_LOCK_STALE_DAYS` | Integer | The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. | 1 |
| `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 | | `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 |
| `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 | | `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 |
| `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 | | `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 |

عرض الملف

@@ -29,6 +29,8 @@ postal:
smtp_relays: [] smtp_relays: []
# An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) # An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)
trusted_proxies: [] trusted_proxies: []
# The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried.
queued_message_lock_stale_days: 1
web_server: web_server:
# The default port the web server should listen on unless overriden by the PORT environment variable # The default port the web server should listen on unless overriden by the PORT environment variable

عرض الملف

@@ -91,6 +91,11 @@ module Postal
description "An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)" description "An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)"
transform { |ip| IPAddr.new(ip) } transform { |ip| IPAddr.new(ip) }
end end
integer :queued_message_lock_stale_days do
description "The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried."
default 1
end
end end
group :web_server do group :web_server do

عرض الملف

@@ -19,5 +19,11 @@ FactoryBot.define do
factory :ip_pool do factory :ip_pool do
name { "Default Pool" } name { "Default Pool" }
default { true } default { true }
trait :with_ip_address do
after(:create) do |ip_pool|
ip_pool.ip_addresses << create(:ip_address, ip_pool: ip_pool)
end
end
end end
end end

عرض الملف

@@ -0,0 +1,197 @@
# frozen_string_literal: true
require "rails_helper"
RSpec.describe QueuedMessage do
subject(:queued_message) { build(:queued_message) }
describe "relationships" do
it { is_expected.to belong_to(:server) }
it { is_expected.to belong_to(:ip_address).optional }
end
describe ".ready_with_delayed_retry" do
it "returns messages where retry after is null" do
message = create(:queued_message, retry_after: nil)
expect(described_class.ready_with_delayed_retry).to eq [message]
end
it "returns messages where retry after is less than 30 seconds from now" do
Timecop.freeze do
message1 = create(:queued_message, retry_after: 45.seconds.ago)
message2 = create(:queued_message, retry_after: 5.minutes.ago)
create(:queued_message, retry_after: Time.now)
create(:queued_message, retry_after: 1.minute.from_now)
expect(described_class.ready_with_delayed_retry.order(:id)).to eq [message1, message2]
end
end
end
describe ".with_stale_lock" do
it "returns messages where lock time is less than the configured number of stale days" do
allow(Postal::Config.postal).to receive(:queued_message_lock_stale_days).and_return(2)
message1 = create(:queued_message, locked_at: 3.days.ago, locked_by: "test")
message2 = create(:queued_message, locked_at: 2.days.ago, locked_by: "test")
create(:queued_message, locked_at: 1.days.ago, locked_by: "test")
create(:queued_message)
expect(described_class.with_stale_lock.order(:id)).to eq [message1, message2]
end
end
describe "#retry_now" do
it "removes the retry time" do
message = create(:queued_message, retry_after: 2.minutes.from_now)
expect { message.retry_now }.to change { message.reload.retry_after }.from(kind_of(Time)).to(nil)
end
it "raises an error if invalid" do
message = create(:queued_message, retry_after: 2.minutes.from_now)
message.update_columns(server_id: nil) # unlikely to actually happen
expect { message.retry_now }.to raise_error(ActiveRecord::RecordInvalid)
end
end
describe "#send_bounce" do
let(:server) { create(:server) }
let(:message) { MessageFactory.incoming(server) }
subject(:queued_message) { create(:queued_message, message: message) }
context "when the message is eligiable for bounces" do
it "queues a bounce message for sending" do
expect(BounceMessage).to receive(:new).with(server, kind_of(Postal::MessageDB::Message)).and_wrap_original do |original, *args|
bounce = original.call(*args)
expect(bounce).to receive(:queue)
bounce
end
queued_message.send_bounce
end
end
context "when the message is not eligible for bounces" do
it "returns nil" do
message.update(bounce: true)
expect(queued_message.send_bounce).to be nil
end
it "does not queue a bounce message for sending" do
message.update(bounce: true)
expect(BounceMessage).not_to receive(:new)
queued_message.send_bounce
end
end
end
describe "#allocate_ip_address" do
subject(:queued_message) { create(:queued_message) }
context "when ip pools is disabled" do
it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end
it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end
context "when IP pools is enabled" do
before do
allow(Postal::Config.postal).to receive(:use_ip_pools?).and_return(true)
end
context "when there is no backend message" do
it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end
it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end
context "when no IP pool can be determined for the message" do
let(:server) { create(:server) }
let(:message) { MessageFactory.outgoing(server) }
subject(:queued_message) { create(:queued_message, message: message) }
it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end
it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end
context "when an IP pool can be determined for the message" do
let(:ip_pool) { create(:ip_pool, :with_ip_address) }
let(:server) { create(:server, ip_pool: ip_pool) }
let(:message) { MessageFactory.outgoing(server) }
subject(:queued_message) { create(:queued_message, message: message) }
it "returns an IP address" do
expect(queued_message.allocate_ip_address).to be_a IPAddress
end
it "allocates an IP address to the queued message" do
queued_message.update(ip_address: nil)
expect { queued_message.allocate_ip_address }.to change(queued_message, :ip_address).from(nil).to(ip_pool.ip_addresses.first)
end
end
end
end
describe "#batchable_messages" do
context "when the message is not locked" do
subject(:queued_message) { build(:queued_message) }
it "raises an error" do
expect { queued_message.batchable_messages }.to raise_error(Postal::Error, /must lock current message before locking any friends/i)
end
end
context "when the message is locked" do
let(:batch_key) { nil }
subject(:queued_message) { build(:queued_message, :locked, batch_key: batch_key) }
context "when there is no batch key on the queued message" do
it "returns an empty array" do
expect(queued_message.batch_key).to be nil
expect(queued_message.batchable_messages).to eq []
end
end
context "when there is a batch key" do
let(:batch_key) { "1234" }
it "finds and locks messages with the same batch key and IP address up to the limit specified" do
other_message1 = create(:queued_message, batch_key: batch_key, ip_address: nil)
other_message2 = create(:queued_message, batch_key: batch_key, ip_address: nil)
create(:queued_message, batch_key: batch_key, ip_address: nil)
messages = queued_message.batchable_messages(2)
expect(messages).to eq [other_message1, other_message2]
expect(messages).to all be_locked
end
it "does not find messages with a different batch key" do
create(:queued_message, batch_key: "5678", ip_address: nil)
expect(queued_message.batchable_messages).to eq []
end
it "does not find messages that are not queued for sending yet" do
create(:queued_message, batch_key: batch_key, ip_address: nil, retry_after: 1.minute.from_now)
expect(queued_message.batchable_messages).to eq []
end
it "does not find messages that are for a different IP address" do
create(:queued_message, batch_key: batch_key, ip_address: create(:ip_address))
expect(queued_message.batchable_messages).to eq []
end
end
end
end
end

عرض الملف

@@ -0,0 +1,30 @@
# frozen_string_literal: true
require "rails_helper"
RSpec.describe TidyQueuedMessagesTask do
let(:logger) { TestLogger.new }
subject(:task) { described_class.new(logger: logger) }
describe "#call" do
it "destroys queued messages with stale locks" do
stale_message = create(:queued_message, locked_at: 2.days.ago, locked_by: "test")
task.call
expect { stale_message.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect(logger).to have_logged(/removing queued message \d+/)
end
it "does not destroy messages which are not locked" do
message = create(:queued_message)
task.call
expect { message.reload }.not_to raise_error
end
it "does not destroy messages which where were locked less then the number of stale days" do
message = create(:queued_message, locked_at: 10.minutes.ago, locked_by: "test")
task.call
expect { message.reload }.not_to raise_error
end
end
end