diff --git a/.rubocop.yml b/.rubocop.yml index 25bb950..17ab0f2 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -6,6 +6,7 @@ AllCops: - "db/schema.rb" # Fixes missing gem exception when running Rubocop on GitHub Actions. - "vendor/bundle/**/*" + - lib/tasks/auto_annotate_models.rake # Always use double quotes Style/StringLiterals: diff --git a/Gemfile b/Gemfile index c072b2e..9772788 100644 --- a/Gemfile +++ b/Gemfile @@ -5,10 +5,8 @@ gem "authie" gem "autoprefixer-rails" gem "basic_ssl" gem "bcrypt" -gem "bunny" gem "changey" gem "chronic" -gem "clockwork" gem "dotenv-rails" gem "dynamic_form" gem "encrypto_signo" @@ -55,4 +53,5 @@ group :development do gem "rubocop" gem "rubocop-rails" gem "timecop" + gem "webmock" end diff --git a/Gemfile.lock b/Gemfile.lock index 2344799..2471753 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -60,7 +60,8 @@ GEM minitest (>= 5.1) tzinfo (~> 2.0) zeitwerk (~> 2.3) - amq-protocol (2.3.2) + addressable (2.8.6) + public_suffix (>= 2.0.2, < 6.0) annotate (3.2.0) activerecord (>= 3.2, < 8.0) rake (>= 10.4, < 14.0) @@ -71,17 +72,12 @@ GEM execjs (~> 2) basic_ssl (1.0.3) bcrypt (3.1.18) + bigdecimal (3.1.6) builder (3.2.4) - bunny (2.20.3) - amq-protocol (~> 2.3, >= 2.3.1) - sorted_set (~> 1, >= 1.0.2) byebug (11.1.3) changey (1.1.0) activerecord (>= 4.2, < 7) chronic (0.10.2) - clockwork (3.0.2) - activesupport - tzinfo coffee-rails (5.0.0) coffee-script (>= 2.2.0) railties (>= 5.2.0) @@ -90,6 +86,9 @@ GEM execjs coffee-script-source (1.12.2) concurrent-ruby (1.2.3) + crack (1.0.0) + bigdecimal + rexml crass (1.0.6) database_cleaner (2.0.2) database_cleaner-active_record (>= 2, < 3) @@ -125,6 +124,7 @@ GEM temple (>= 0.8.2) thor tilt + hashdiff (1.1.0) hashie (5.0.0) highline (2.1.0) i18n (1.14.1) @@ -193,6 +193,7 @@ GEM parallel (1.22.1) parser (3.2.1.1) ast (~> 2.4.1) + public_suffix (5.0.4) puma (6.4.2) nio4r (~> 2.0) racc (1.7.3) @@ -229,7 +230,6 @@ GEM thor (~> 1.0) rainbow (3.1.1) rake (13.1.0) - rbtree (0.4.6) regexp_parser (2.7.0) resolv (0.2.2) rexml (3.2.5) @@ -289,10 +289,6 @@ GEM sentry-ruby (~> 5.8.0) sentry-ruby (5.8.0) concurrent-ruby (~> 1.0, >= 1.0.2) - set (1.0.3) - sorted_set (1.0.3) - rbtree - set (~> 1.0) sprockets (4.2.0) concurrent-ruby (~> 1.0) rack (>= 2.2.4, < 4) @@ -313,6 +309,10 @@ GEM uglifier (4.2.0) execjs (>= 0.3.0, < 3) unicode-display_width (2.4.2) + webmock (3.20.0) + addressable (>= 2.8.0) + crack (>= 0.3.2) + hashdiff (>= 0.4.0, < 2.0.0) websocket-driver (0.7.6) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) @@ -330,11 +330,9 @@ DEPENDENCIES autoprefixer-rails basic_ssl bcrypt - bunny byebug changey chronic - clockwork coffee-rails (~> 5.0) database_cleaner dotenv-rails @@ -371,6 +369,7 @@ DEPENDENCIES timecop turbolinks (~> 5) uglifier (>= 1.3.0) + webmock BUNDLED WITH 2.4.9 diff --git a/Procfile.dev b/Procfile.dev index 9f0e14a..6fd1dd7 100644 --- a/Procfile.dev +++ b/Procfile.dev @@ -1,5 +1,3 @@ web: bundle exec puma -C config/puma.rb worker: bundle exec ruby script/worker.rb -cron: bundle exec rake postal:cron -smtp: bundle exec rake postal:smtp_server -requeuer: bundle exec rake postal:requeuer +smtp: bundle exec ruby script/smtp_server.rb diff --git a/app/controllers/messages_controller.rb b/app/controllers/messages_controller.rb index 6a6c2f1..c028cdb 100644 --- a/app/controllers/messages_controller.rb +++ b/app/controllers/messages_controller.rb @@ -116,7 +116,7 @@ class MessagesController < ApplicationController def retry if @message.raw_message? if @message.queued_message - @message.queued_message.queue! + @message.queued_message.retry_now flash[:notice] = "This message will be retried shortly." elsif @message.held? @message.add_to_message_queue(manual: true) diff --git a/app/jobs/action_deletion_job.rb b/app/jobs/action_deletion_job.rb deleted file mode 100644 index e598a85..0000000 --- a/app/jobs/action_deletion_job.rb +++ /dev/null @@ -1,16 +0,0 @@ -# frozen_string_literal: true - -class ActionDeletionJob < Postal::Job - - def perform - object = params["type"].constantize.deleted.find_by_id(params["id"]) - if object - log "Deleting #{params['type']}##{params['id']}" - object.destroy - log "Deleted #{params['type']}##{params['id']}" - else - log "Couldn't find deleted object #{params['type']}##{params['id']}" - end - end - -end diff --git a/app/jobs/action_deletions_job.rb b/app/jobs/action_deletions_job.rb deleted file mode 100644 index f68b1d9..0000000 --- a/app/jobs/action_deletions_job.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -class ActionDeletionsJob < Postal::Job - - def perform - Organization.deleted.each do |org| - log "Permanently removing organization #{org.id} (#{org.permalink})" - org.destroy - end - - Server.deleted.each do |server| - log "Permanently removing server #{server.id} (#{server.full_permalink})" - server.destroy - end - end - -end diff --git a/app/jobs/prune_suppression_lists_job.rb b/app/jobs/prune_suppression_lists_job.rb deleted file mode 100644 index ece9db7..0000000 --- a/app/jobs/prune_suppression_lists_job.rb +++ /dev/null @@ -1,12 +0,0 @@ -# frozen_string_literal: true - -class PruneSuppressionListsJob < Postal::Job - - def perform - Server.all.each do |s| - log "Pruning suppression lists for server #{s.id}" - s.message_db.suppression_list.prune - end - end - -end diff --git a/app/jobs/prune_webhook_requests_job.rb b/app/jobs/prune_webhook_requests_job.rb deleted file mode 100644 index 632bc4a..0000000 --- a/app/jobs/prune_webhook_requests_job.rb +++ /dev/null @@ -1,12 +0,0 @@ -# frozen_string_literal: true - -class PruneWebhookRequestsJob < Postal::Job - - def perform - Server.all.each do |s| - log "Pruning webhook requests for server #{s.id}" - s.message_db.webhooks.prune - end - end - -end diff --git a/app/jobs/requeue_webhooks_job.rb b/app/jobs/requeue_webhooks_job.rb deleted file mode 100644 index 736f0b6..0000000 --- a/app/jobs/requeue_webhooks_job.rb +++ /dev/null @@ -1,9 +0,0 @@ -# frozen_string_literal: true - -class RequeueWebhooksJob < Postal::Job - - def perform - WebhookRequest.requeue_all - end - -end diff --git a/app/jobs/send_notifications_job.rb b/app/jobs/send_notifications_job.rb deleted file mode 100644 index 9d519c6..0000000 --- a/app/jobs/send_notifications_job.rb +++ /dev/null @@ -1,9 +0,0 @@ -# frozen_string_literal: true - -class SendNotificationsJob < Postal::Job - - def perform - Server.send_send_limit_notifications - end - -end diff --git a/app/jobs/send_webhook_job.rb b/app/jobs/send_webhook_job.rb deleted file mode 100644 index 8228563..0000000 --- a/app/jobs/send_webhook_job.rb +++ /dev/null @@ -1,29 +0,0 @@ -# frozen_string_literal: true - -class SendWebhookJob < Postal::Job - - def perform - if server = Server.find(params["server_id"]) - new_items = {} - params["payload"]&.each do |key, value| - next unless key.to_s =~ /\A_(\w+)/ - - begin - new_items[::Regexp.last_match(1)] = server.message_db.message(value.to_i).webhook_hash - rescue Postal::MessageDB::Message::NotFound - # No message found, don't do any replacement - end - end - - new_items.each do |key, value| - params["payload"].delete("_#{key}") - params["payload"][key] = value - end - - WebhookRequest.trigger(server, params["event"], params["payload"]) - else - log "Couldn't find server with ID #{params['server_id']}" - end - end - -end diff --git a/app/jobs/sleep_job.rb b/app/jobs/sleep_job.rb deleted file mode 100644 index 9604b76..0000000 --- a/app/jobs/sleep_job.rb +++ /dev/null @@ -1,9 +0,0 @@ -# frozen_string_literal: true - -class SleepJob < Postal::Job - - def perform - sleep 5 - end - -end diff --git a/app/jobs/tidy_raw_messages_job.rb b/app/jobs/tidy_raw_messages_job.rb deleted file mode 100644 index 151188f..0000000 --- a/app/jobs/tidy_raw_messages_job.rb +++ /dev/null @@ -1,8 +0,0 @@ -# frozen_string_literal: true - -class TidyRawMessagesJob < Postal::Job - - def perform - end - -end diff --git a/app/jobs/unqueue_message_job.rb b/app/jobs/unqueue_message_job.rb deleted file mode 100644 index 809ecfb..0000000 --- a/app/jobs/unqueue_message_job.rb +++ /dev/null @@ -1,468 +0,0 @@ -# frozen_string_literal: true - -class UnqueueMessageJob < Postal::Job - - # rubocop:disable Layout/LineLength - def perform - if original_message = QueuedMessage.find_by_id(params["id"]) - if original_message.acquire_lock - - log "Lock acquired for queued message #{original_message.id}" - - begin - original_message.message - rescue Postal::MessageDB::Message::NotFound - log "Unqueue #{original_message.id} because backend message has been removed." - original_message.destroy - return - end - - unless original_message.retriable? - log "Skipping because retry after isn't reached" - original_message.unlock - return - end - - begin - other_messages = original_message.batchable_messages(100) - log "Found #{other_messages.size} associated messages to process at the same time (batch key: #{original_message.batch_key})" - rescue StandardError - original_message.unlock - raise - end - - ([original_message] + other_messages).each do |queued_message| - log_prefix = "[#{queued_message.server_id}::#{queued_message.message_id} #{queued_message.id}]" - begin - log "#{log_prefix} Got queued message with exclusive lock" - - begin - queued_message.message - rescue Postal::MessageDB::Message::NotFound - log "#{log_prefix} Unqueueing #{queued_message.id} because backend message has been removed" - queued_message.destroy - next - end - - # - # If the server is suspended, hold all messages - # - if queued_message.server.suspended? - log "#{log_prefix} Server is suspended. Holding message." - queued_message.message.create_delivery("Held", details: "Mail server has been suspended. No e-mails can be processed at present. Contact support for assistance.") - queued_message.destroy - next - end - - # We might not be able to send this any more, check the attempts - if queued_message.attempts >= Postal.config.general.maximum_delivery_attempts - details = "Maximum number of delivery attempts (#{queued_message.attempts}) has been reached." - if queued_message.message.scope == "incoming" - # Send bounces to incoming e-mails when they are hard failed - if bounce_id = queued_message.send_bounce - details += " Bounce sent to sender (see message )" - end - elsif queued_message.message.scope == "outgoing" - # Add the recipient to the suppression list - if queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, reason: "too many soft fails") - log "Added #{queued_message.message.rcpt_to} to suppression list because maximum attempts has been reached" - details += " Added #{queued_message.message.rcpt_to} to suppression list because delivery has failed #{queued_message.attempts} times." - end - end - queued_message.message.create_delivery("HardFail", details: details) - queued_message.destroy - log "#{log_prefix} Message has reached maximum number of attempts. Hard failing." - next - end - - # If the raw message has been removed (removed by retention) - unless queued_message.message.raw_message? - log "#{log_prefix} Raw message has been removed. Not sending." - queued_message.message.create_delivery("HardFail", details: "Raw message has been removed. Cannot send message.") - queued_message.destroy - next - end - - # - #  Handle Incoming Messages - # - if queued_message.message.scope == "incoming" - # - # If this is a bounce, we need to handle it as such - # - if queued_message.message.bounce - log "#{log_prefix} Message is a bounce" - original_messages = queued_message.message.original_messages - unless original_messages.empty? - queued_message.message.original_messages.each do |orig_msg| - queued_message.message.update(bounce_for_id: orig_msg.id, domain_id: orig_msg.domain_id) - queued_message.message.create_delivery("Processed", details: "This has been detected as a bounce message for .") - orig_msg.bounce!(queued_message.message) - log "#{log_prefix} Bounce linked with message #{orig_msg.id}" - end - queued_message.destroy - next - end - - # This message was sent to the return path but hasn't been matched - #  to an original message. If we have a route for this, route it - #  otherwise we'll drop at this point. - if queued_message.message.route_id.nil? - log "#{log_prefix} No source messages found. Hard failing." - queued_message.message.create_delivery("HardFail", details: "This message was a bounce but we couldn't link it with any outgoing message and there was no route for it.") - queued_message.destroy - next - end - end - - # - # Update live stats - # - queued_message.message.database.live_stats.increment(queued_message.message.scope) - - # - # Inspect incoming messages - # - unless queued_message.message.inspected - log "#{log_prefix} Inspecting message" - queued_message.message.inspect_message - if queued_message.message.inspected - is_spam = queued_message.message.spam_score > queued_message.server.spam_threshold - queued_message.message.update(spam: true) if is_spam - queued_message.message.append_headers( - "X-Postal-Spam: #{queued_message.message.spam ? 'yes' : 'no'}", - "X-Postal-Spam-Threshold: #{queued_message.server.spam_threshold}", - "X-Postal-Spam-Score: #{queued_message.message.spam_score}", - "X-Postal-Threat: #{queued_message.message.threat ? 'yes' : 'no'}" - ) - log "#{log_prefix} Message inspected successfully. Headers added." - end - end - - # - # If this message has a SPAM score higher than is permitted - # - if queued_message.message.spam_score >= queued_message.server.spam_failure_threshold - log "#{log_prefix} Message has a spam score higher than the server's maxmimum. Hard failing." - queued_message.message.create_delivery("HardFail", details: "Message's spam score is higher than the failure threshold for this server. Threshold is currently #{queued_message.server.spam_failure_threshold}.") - queued_message.destroy - next - end - - # If the server is in development mode, hold it - if queued_message.server.mode == "Development" && !queued_message.manual? - log "Server is in development mode so holding." - queued_message.message.create_delivery("Held", details: "Server is in development mode.") - queued_message.destroy - log "#{log_prefix} Server is in development mode. Holding." - next - end - - # - # Find out what sort of message we're supposed to be sending and dispatch this request over to - # the sender. - # - if route = queued_message.message.route - - # If the route says we're holding quananteed mail and this is spam, we'll hold this - if route.spam_mode == "Quarantine" && queued_message.message.spam && !queued_message.manual? - queued_message.message.create_delivery("Held", details: "Message placed into quarantine.") - queued_message.destroy - log "#{log_prefix} Route says to quarantine spam message. Holding." - next - end - - # If the route says we're holding quananteed mail and this is spam, we'll hold this - if route.spam_mode == "Fail" && queued_message.message.spam && !queued_message.manual? - queued_message.message.create_delivery("HardFail", details: "Message is spam and the route specifies it should be failed.") - queued_message.destroy - log "#{log_prefix} Route says to fail spam message. Hard failing." - next - end - - # - # Messages that should be blindly accepted are blindly accepted - # - if route.mode == "Accept" - queued_message.message.create_delivery("Processed", details: "Message has been accepted but not sent to any endpoints.") - queued_message.destroy - log "#{log_prefix} Route says to accept without endpoint. Marking as processed." - next - end - - # - # Messages that should be accepted and held should be held - # - if route.mode == "Hold" - log "#{log_prefix} Route says to hold message." - if queued_message.manual? - log "#{log_prefix} Message was queued manually. Marking as processed." - queued_message.message.create_delivery("Processed", details: "Message has been processed.") - else - log "#{log_prefix} Message was not queued manually. Holding." - queued_message.message.create_delivery("Held", details: "Message has been accepted but not sent to any endpoints.") - end - queued_message.destroy - next - end - - # - # Messages that should be bounced should be bounced (or rejected if they got this far) - # - if route.mode == "Bounce" || route.mode == "Reject" - if id = queued_message.send_bounce - queued_message.message.create_delivery("HardFail", details: "Message has been bounced because the route asks for this. See message ") - log "#{log_prefix} Route says to bounce. Hard failing and sent bounce (#{id})." - end - queued_message.destroy - next - end - - if @fixed_result - result = @fixed_result - else - case queued_message.message.endpoint - when SMTPEndpoint - sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, nil, servers: [queued_message.message.endpoint]) - when HTTPEndpoint - sender = cached_sender(Postal::HTTPSender, queued_message.message.endpoint) - when AddressEndpoint - sender = cached_sender(Postal::SMTPSender, queued_message.message.endpoint.domain, nil, force_rcpt_to: queued_message.message.endpoint.address) - else - log "#{log_prefix} Invalid endpoint for route (#{queued_message.message.endpoint_type})" - queued_message.message.create_delivery("HardFail", details: "Invalid endpoint for route.") - queued_message.destroy - next - end - result = sender.send_message(queued_message.message) - if result.connect_error - @fixed_result = result - end - end - - # Log the result - log_details = result.details - if result.type == "HardFail" && result.suppress_bounce - # The delivery hard failed, but requested that no bounce be sent - log "#{log_prefix} Suppressing bounce message after hard fail" - elsif result.type == "HardFail" && queued_message.message.send_bounces? - # If the message is a hard fail, send a bounce message for this message. - log "#{log_prefix} Sending a bounce because message hard failed" - if bounce_id = queued_message.send_bounce - log_details += ". " unless log_details =~ /\.\z/ - log_details += " Sent bounce message to sender (see message )" - end - end - - queued_message.message.create_delivery(result.type, details: log_details, output: result.output&.strip, sent_with_ssl: result.secure, log_id: result.log_id, time: result.time) - - if result.retry - log "#{log_prefix} Message requeued for trying later." - queued_message.retry_later(result.retry.is_a?(Integer) ? result.retry : nil) - queued_message.allocate_ip_address - queued_message.update_column(:ip_address_id, queued_message.ip_address&.id) - else - log "#{log_prefix} Message processing completed." - queued_message.message.endpoint.mark_as_used - queued_message.destroy - end - else - log "#{log_prefix} No route and/or endpoint available for processing. Hard failing." - queued_message.message.create_delivery("HardFail", details: "Message does not have a route and/or endpoint available for delivery.") - queued_message.destroy - next - end - end - - # - # Handle Outgoing Messages - # - if queued_message.message.scope == "outgoing" - if queued_message.message.domain.nil? - log "#{log_prefix} Message has no domain. Hard failing." - queued_message.message.create_delivery("HardFail", details: "Message's domain no longer exist") - queued_message.destroy - next - end - - # - # If there's no to address, we can't do much. Fail it. - # - if queued_message.message.rcpt_to.blank? - log "#{log_prefix} Message has no to address. Hard failing." - queued_message.message.create_delivery("HardFail", details: "Message doesn't have an RCPT to") - queued_message.destroy - next - end - - # Extract a tag and add it to the message if one doesn't exist - if queued_message.message.tag.nil? && tag = queued_message.message.headers["x-postal-tag"] - log "#{log_prefix} Added tag #{tag.last}" - queued_message.message.update(tag: tag.last) - end - - # - # If the credentials for this message is marked as holding and this isn't manual, hold it - # - if !queued_message.manual? && queued_message.message.credential && queued_message.message.credential.hold? - log "#{log_prefix} Credential wants us to hold messages. Holding." - queued_message.message.create_delivery("Held", details: "Credential is configured to hold all messages authenticated by it.") - queued_message.destroy - next - end - - # - # If the recipient is on the suppression list and this isn't a manual queueing block sending - # - if !queued_message.manual? && sl = queued_message.server.message_db.suppression_list.get(:recipient, queued_message.message.rcpt_to) - log "#{log_prefix} Recipient is on the suppression list. Holding." - queued_message.message.create_delivery("Held", details: "Recipient (#{queued_message.message.rcpt_to}) is on the suppression list (reason: #{sl['reason']})") - queued_message.destroy - next - end - - # Parse the content of the message as appropriate - if queued_message.message.should_parse? - log "#{log_prefix} Parsing message content as it hasn't been parsed before" - queued_message.message.parse_content - end - - # Inspect outgoing messages when there's a threshold set for the server - if !queued_message.message.inspected && queued_message.server.outbound_spam_threshold - log "#{log_prefix} Inspecting message" - queued_message.message.inspect_message - if queued_message.message.inspected - if queued_message.message.spam_score >= queued_message.server.outbound_spam_threshold - queued_message.message.update(spam: true) - end - log "#{log_prefix} Message inspected successfully" - end - end - - if queued_message.message.spam - queued_message.message.create_delivery("HardFail", details: "Message is likely spam. Threshold is #{queued_message.server.outbound_spam_threshold} and the message scored #{queued_message.message.spam_score}.") - queued_message.destroy - log "#{log_prefix} Message is spam (#{queued_message.message.spam_score}). Hard failing." - next - end - - # Add outgoing headers - unless queued_message.message.has_outgoing_headers? - queued_message.message.add_outgoing_headers - end - - # Check send limits - if queued_message.server.send_limit_exceeded? - # If we're over the limit, we're going to be holding this message - queued_message.server.update_columns(send_limit_exceeded_at: Time.now, send_limit_approaching_at: nil) - queued_message.message.create_delivery("Held", details: "Message held because send limit (#{queued_message.server.send_limit}) has been reached.") - queued_message.destroy - log "#{log_prefix} Server send limit has been exceeded. Holding." - next - elsif queued_message.server.send_limit_approaching? - # If we're approaching the limit, just say we are but continue to process the message - queued_message.server.update_columns(send_limit_approaching_at: Time.now, send_limit_exceeded_at: nil) - else - queued_message.server.update_columns(send_limit_approaching_at: nil, send_limit_exceeded_at: nil) - end - - # Update the live stats for this message. - queued_message.message.database.live_stats.increment(queued_message.message.scope) - - # If the server is in development mode, hold it - if queued_message.server.mode == "Development" && !queued_message.manual? - log "Server is in development mode so holding." - queued_message.message.create_delivery("Held", details: "Server is in development mode.") - queued_message.destroy - log "#{log_prefix} Server is in development mode. Holding." - next - end - - # Send the outgoing message to the SMTP sender - - if @fixed_result - result = @fixed_result - else - sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, queued_message.ip_address) - result = sender.send_message(queued_message.message) - if result.connect_error - @fixed_result = result - end - end - - # - # If the message has been hard failed, check to see how many other recent hard fails we've had for the address - # and if there are more than 2, suppress the address for 30 days. - # - if result.type == "HardFail" - recent_hard_fails = queued_message.server.message_db.select(:messages, where: { rcpt_to: queued_message.message.rcpt_to, status: "HardFail", timestamp: { greater_than: 24.hours.ago.to_f } }, count: true) - if recent_hard_fails >= 1 && queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, reason: "too many hard fails") - log "#{log_prefix} Added #{queued_message.message.rcpt_to} to suppression list because #{recent_hard_fails} hard fails in 24 hours" - result.details += "." if result.details =~ /\.\z/ - result.details += " Recipient added to suppression list (too many hard fails)." - end - end - - # - # If a message is sent successfully, remove the users from the suppression list - # - if result.type == "Sent" && queued_message.server.message_db.suppression_list.remove(:recipient, queued_message.message.rcpt_to) - log "#{log_prefix} Removed #{queued_message.message.rcpt_to} from suppression list because success" - result.details += "." if result.details =~ /\.\z/ - result.details += " Recipient removed from suppression list." - end - - # Log the result - queued_message.message.create_delivery(result.type, details: result.details, output: result.output, sent_with_ssl: result.secure, log_id: result.log_id, time: result.time) - if result.retry - log "#{log_prefix} Message requeued for trying later." - queued_message.retry_later(result.retry.is_a?(Integer) ? result.retry : nil) - else - log "#{log_prefix} Processing complete" - queued_message.destroy - end - end - rescue StandardError => e - log "#{log_prefix} Internal error: #{e.class}: #{e.message}" - e.backtrace.each { |line| log("#{log_prefix} #{line}") } - queued_message.retry_later - log "#{log_prefix} Queued message was unlocked" - if defined?(Sentry) - Sentry.capture_exception(e, extra: { job_id: self.id, server_id: queued_message.server_id, message_id: queued_message.message_id }) - end - queued_message.message&.create_delivery("Error", - details: "An internal error occurred while sending " \ - "this message. This message will be retried " \ - "automatically.", - output: "#{e.class}: #{e.message}", log_id: "J-#{self.id}") - end - end - - else - log "Couldn't get lock for message #{params['id']}. I won't do this." - end - else - log "No queued message with ID #{params['id']} was available for processing." - end - ensure - begin - @sender&.finish - rescue StandardError - nil - end - end - # rubocop:enable Layout/LineLength - - private - - # rubocop:disable Naming/MemoizedInstanceVariableName - def cached_sender(klass, *args) - @sender ||= begin - sender = klass.new(*args) - sender.start - sender - end - end - # rubocop:enable Naming/MemoizedInstanceVariableName - -end diff --git a/app/jobs/webhook_delivery_job.rb b/app/jobs/webhook_delivery_job.rb deleted file mode 100644 index a7369f5..0000000 --- a/app/jobs/webhook_delivery_job.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -class WebhookDeliveryJob < Postal::Job - - def perform - if webhook_request = WebhookRequest.find_by_id(params["id"]) - if webhook_request.deliver - log "Succesfully delivered" - else - log "Delivery failed" - end - else - log "No webhook request found with ID '#{params['id']}'" - end - end - -end diff --git a/app/models/concerns/has_locking.rb b/app/models/concerns/has_locking.rb new file mode 100644 index 0000000..6223416 --- /dev/null +++ b/app/models/concerns/has_locking.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +# This concern provides functionality for locking items along with additional functionality to handle +# the concept of retrying items after a certain period of time. The following database columns are +# required on the model +# +# * locked_by - A string column to store the name of the process that has locked the item +# * locked_at - A datetime column to store the time the item was locked +# * retry_after - A datetime column to store the time after which the item should be retried +# * attempts - An integer column to store the number of attempts that have been made to process the item +# +# 'ready' means that it's ready to be processed. +module HasLocking + + extend ActiveSupport::Concern + + included do + scope :unlocked, -> { where(locked_at: nil) } + scope :ready, -> { where("retry_after IS NULL OR retry_after < ?", Time.now) } + end + + def ready? + retry_after.nil? || retry_after < Time.now + end + + def unlock + self.locked_by = nil + self.locked_at = nil + update_columns(locked_by: nil, locked_at: nil) + end + + def locked? + locked_at.present? + end + + def retry_later(time = nil) + retry_time = time || calculate_retry_time(attempts, 5.minutes) + self.locked_by = nil + self.locked_at = nil + update_columns(locked_by: nil, locked_at: nil, retry_after: Time.now + retry_time, attempts: attempts + 1) + end + + def calculate_retry_time(attempts, initial_period) + (1.3**attempts) * initial_period + end + +end diff --git a/app/models/concerns/has_soft_destroy.rb b/app/models/concerns/has_soft_destroy.rb index 55cfcaf..cb07790 100644 --- a/app/models/concerns/has_soft_destroy.rb +++ b/app/models/concerns/has_soft_destroy.rb @@ -14,7 +14,6 @@ module HasSoftDestroy run_callbacks :soft_destroy do self.deleted_at = Time.now save! - ActionDeletionJob.queue(:main, type: self.class.name, id: id) end end diff --git a/app/models/queued_message.rb b/app/models/queued_message.rb index 5fc413b..bbe7468 100644 --- a/app/models/queued_message.rb +++ b/app/models/queued_message.rb @@ -29,33 +29,18 @@ class QueuedMessage < ApplicationRecord include HasMessage + include HasLocking belongs_to :server belongs_to :ip_address, optional: true belongs_to :user, optional: true before_create :allocate_ip_address - after_commit :queue, on: :create - scope :unlocked, -> { where(locked_at: nil) } - scope :retriable, -> { where("retry_after IS NULL OR retry_after < ?", Time.now) } - scope :requeueable, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) } + scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) } - def retriable? - retry_after.nil? || retry_after < Time.now - end - - def queue - UnqueueMessageJob.queue(queue_name, id: id) - end - - def queue! - update_column(:retry_after, nil) - queue - end - - def queue_name - ip_address ? :"outgoing-#{ip_address.id}" : :main + def retry_now + update(retry_after: nil) end def send_bounce @@ -70,40 +55,6 @@ class QueuedMessage < ApplicationRecord self.ip_address = pool.ip_addresses.select_by_priority end - def acquire_lock - time = Time.now - locker = Postal.locker_name - rows = self.class.where(id: id, locked_by: nil, locked_at: nil).update_all(locked_by: locker, locked_at: time) - if rows == 1 - self.locked_by = locker - self.locked_at = time - true - else - false - end - end - - def retry_later(time = nil) - retry_time = time || self.class.calculate_retry_time(attempts, 5.minutes) - self.locked_by = nil - self.locked_at = nil - update_columns(locked_by: nil, locked_at: nil, retry_after: Time.now + retry_time, attempts: attempts + 1) - end - - def unlock - self.locked_by = nil - self.locked_at = nil - update_columns(locked_by: nil, locked_at: nil) - end - - def self.calculate_retry_time(attempts, initial_period) - (1.3**attempts) * initial_period - end - - def locked? - locked_at.present? - end - def batchable_messages(limit = 10) unless locked? raise Postal::Error, "Must lock current message before locking any friends" @@ -114,13 +65,9 @@ class QueuedMessage < ApplicationRecord else time = Time.now locker = Postal.locker_name - self.class.retriable.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: nil, locked_at: nil).limit(limit).update_all(locked_by: locker, locked_at: time) + self.class.ready.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: nil, locked_at: nil).limit(limit).update_all(locked_by: locker, locked_at: time) QueuedMessage.where(batch_key: batch_key, ip_address_id: ip_address_id, locked_by: locker, locked_at: time).where.not(id: id) end end - def self.requeue_all - unlocked.requeueable.each(&:queue) - end - end diff --git a/app/models/scheduled_task.rb b/app/models/scheduled_task.rb new file mode 100644 index 0000000..0989815 --- /dev/null +++ b/app/models/scheduled_task.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: scheduled_tasks +# +# id :bigint not null, primary key +# name :string(255) +# next_run_after :datetime +# +# Indexes +# +# index_scheduled_tasks_on_name (name) UNIQUE +# +class ScheduledTask < ApplicationRecord +end diff --git a/app/models/server.rb b/app/models/server.rb index fd23c4e..942f40c 100644 --- a/app/models/server.rb +++ b/app/models/server.rb @@ -206,7 +206,7 @@ class Server < ApplicationRecord end def queue_size - @queue_size ||= queued_messages.retriable.count + @queue_size ||= queued_messages.ready.count end def stats @@ -222,7 +222,7 @@ class Server < ApplicationRecord # Return the domain which can be used to authenticate emails sent from the given e-mail address. # - #  @param address [String] an e-mail address + # @param address [String] an e-mail address # @return [Domain, nil] the domain to use for authentication def authenticated_domain_for_address(address) return nil if address.blank? diff --git a/app/models/webhook_request.rb b/app/models/webhook_request.rb index ada4e34..c475155 100644 --- a/app/models/webhook_request.rb +++ b/app/models/webhook_request.rb @@ -5,21 +5,28 @@ # Table name: webhook_requests # # id :integer not null, primary key +# attempts :integer default(0) +# error :text(65535) +# event :string(255) +# locked_at :datetime +# locked_by :string(255) +# payload :text(65535) +# retry_after :datetime +# url :string(255) +# uuid :string(255) +# created_at :datetime # server_id :integer # webhook_id :integer -# url :string(255) -# event :string(255) -# uuid :string(255) -# payload :text(65535) -# attempts :integer default(0) -# retry_after :datetime -# error :text(65535) -# created_at :datetime +# +# Indexes +# +# index_webhook_requests_on_locked_by (locked_by) # class WebhookRequest < ApplicationRecord include HasUUID + include HasLocking RETRIES = { 1 => 2.minutes, 2 => 3.minutes, 3 => 6.minutes, 4 => 10.minutes, 5 => 15.minutes }.freeze @@ -31,30 +38,9 @@ class WebhookRequest < ApplicationRecord serialize :payload, Hash - after_commit :queue, on: :create - - def self.trigger(server, event, payload = {}) - unless server.is_a?(Server) - server = Server.find(server.to_i) - end - - webhooks = server.webhooks.enabled.includes(:webhook_events).references(:webhook_events).where("webhooks.all_events = ? OR webhook_events.event = ?", true, event) - webhooks.each do |webhook| - server.webhook_requests.create!(event: event, payload: payload, webhook: webhook, url: webhook.url) - end - end - - def self.requeue_all - where("retry_after < ?", Time.now).find_each(&:queue) - end - - def queue - WebhookDeliveryJob.queue(:main, id: id) - end - def deliver payload = { event: event, timestamp: created_at.to_f, payload: self.payload, uuid: uuid }.to_json - Postal.logger.tagged(event: event, url: url, component: "webhooks") do + Postal.logger.tagged(event: event, url: url) do Postal.logger.info "Sending webhook request" result = Postal::HTTP.post(url, sign: true, json: payload, timeout: 5) self.attempts += 1 @@ -74,7 +60,7 @@ class WebhookRequest < ApplicationRecord if result[:code] >= 200 && result[:code] < 300 Postal.logger.info "Received #{result[:code]} status code. That's OK." - destroy + destroy! webhook&.update_column(:last_used_at, Time.now) true else @@ -82,14 +68,31 @@ class WebhookRequest < ApplicationRecord self.error = "Couldn't send to URL. Code received was #{result[:code]}" if retry_after Postal.logger.info "Will retry #{retry_after} (this was attempt #{self.attempts})" - save + self.locked_by = nil + self.locked_at = nil + save! else Postal.logger.info "Have tried #{self.attempts} times. Giving up." - destroy + destroy! end false end end end + class << self + + def trigger(server, event, payload = {}) + unless server.is_a?(Server) + server = Server.find(server.to_i) + end + + webhooks = server.webhooks.enabled.includes(:webhook_events).references(:webhook_events).where("webhooks.all_events = ? OR webhook_events.event = ?", true, event) + webhooks.each do |webhook| + server.webhook_requests.create!(event: event, payload: payload, webhook: webhook, url: webhook.url) + end + end + + end + end diff --git a/app/models/worker_role.rb b/app/models/worker_role.rb new file mode 100644 index 0000000..22e83ea --- /dev/null +++ b/app/models/worker_role.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: worker_roles +# +# id :bigint not null, primary key +# acquired_at :datetime +# role :string(255) +# worker :string(255) +# +# Indexes +# +# index_worker_roles_on_role (role) UNIQUE +# +class WorkerRole < ApplicationRecord + + class << self + + # Acquire or renew a lock for the given role. + # + # @param role [String] The name of the role to acquire + # @return [Symbol, false] True if the lock was acquired or renewed, false otherwise + def acquire(role) + # update our existing lock if we already have one + updates = where(role: role, worker: Postal.locker_name).update_all(acquired_at: Time.current) + return :renewed if updates.positive? + + # attempt to steal a role from another worker + updates = where(role: role).where("acquired_at is null OR acquired_at < ?", 5.minutes.ago) + .update_all(acquired_at: Time.current, worker: Postal.locker_name) + return :stolen if updates.positive? + + # attempt to create a new role for this worker + begin + create!(role: role, worker: Postal.locker_name, acquired_at: Time.current) + :created + rescue ActiveRecord::RecordNotUnique, ActiveRecord::RecordInvalid + false + end + end + + # Release a lock for the given role for the current process. + # + # @param role [String] The name of the role to release + # @return [Boolean] True if the lock was released, false otherwise + def release(role) + updates = where(role: role, worker: Postal.locker_name).delete_all + updates.positive? + end + + end + +end diff --git a/app/scheduled_tasks/action_deletions_scheduled_task.rb b/app/scheduled_tasks/action_deletions_scheduled_task.rb new file mode 100644 index 0000000..e5c7d5a --- /dev/null +++ b/app/scheduled_tasks/action_deletions_scheduled_task.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class ActionDeletionsScheduledTask < ApplicationScheduledTask + + def call + Organization.deleted.each do |org| + logger.info "permanently removing organization #{org.id} (#{org.permalink})" + org.destroy + end + + Server.deleted.each do |server| + logger.info "permanently removing server #{server.id} (#{server.full_permalink})" + server.destroy + end + end + +end diff --git a/app/scheduled_tasks/application_scheduled_task.rb b/app/scheduled_tasks/application_scheduled_task.rb new file mode 100644 index 0000000..0aebe09 --- /dev/null +++ b/app/scheduled_tasks/application_scheduled_task.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +class ApplicationScheduledTask + + def initialize(logger:) + @logger = logger + end + + def call + # override me + end + + attr_reader :logger + + class << self + + def next_run_after + quarter_past_each_hour + end + + private + + def quarter_past_each_hour + time = Time.current + time = time.change(min: 15, sec: 0) + time += 1.hour if time < Time.current + time + end + + def quarter_to_each_hour + time = Time.current + time = time.change(min: 45, sec: 0) + time += 1.hour if time < Time.current + time + end + + def three_am + time = Time.current + time = time.change(hour: 3, min: 0, sec: 0) + time += 1.day if time < Time.current + time + end + + end + +end diff --git a/app/jobs/check_all_dns_job.rb b/app/scheduled_tasks/check_all_dns_scheduled_task.rb similarity index 62% rename from app/jobs/check_all_dns_job.rb rename to app/scheduled_tasks/check_all_dns_scheduled_task.rb index 0393afa..140d4a8 100644 --- a/app/jobs/check_all_dns_job.rb +++ b/app/scheduled_tasks/check_all_dns_scheduled_task.rb @@ -1,15 +1,15 @@ # frozen_string_literal: true -class CheckAllDNSJob < Postal::Job +class CheckAllDNSScheduledTask < ApplicationScheduledTask - def perform + def call Domain.where.not(dns_checked_at: nil).where("dns_checked_at <= ?", 1.hour.ago).each do |domain| - log "Checking DNS for domain: #{domain.name}" + logger.info "checking DNS for domain: #{domain.name}" domain.check_dns(:auto) end TrackDomain.where("dns_checked_at IS NULL OR dns_checked_at <= ?", 1.hour.ago).includes(:domain).each do |domain| - log "Checking DNS for track domain: #{domain.full_name}" + logger.info "checking DNS for track domain: #{domain.full_name}" domain.check_dns end end diff --git a/app/jobs/cleanup_authie_sessions_job.rb b/app/scheduled_tasks/cleanup_authie_sessions_scheduled_task.rb similarity index 55% rename from app/jobs/cleanup_authie_sessions_job.rb rename to app/scheduled_tasks/cleanup_authie_sessions_scheduled_task.rb index 36a93d0..333fec5 100644 --- a/app/jobs/cleanup_authie_sessions_job.rb +++ b/app/scheduled_tasks/cleanup_authie_sessions_scheduled_task.rb @@ -2,9 +2,9 @@ require "authie/session" -class CleanupAuthieSessionsJob < Postal::Job +class CleanupAuthieSessionsScheduledTask < ApplicationScheduledTask - def perform + def call Authie::Session.cleanup end diff --git a/app/jobs/expire_held_messages_job.rb b/app/scheduled_tasks/expire_held_messages_scheduled_task.rb similarity index 77% rename from app/jobs/expire_held_messages_job.rb rename to app/scheduled_tasks/expire_held_messages_scheduled_task.rb index e54d052..7abfa5b 100644 --- a/app/jobs/expire_held_messages_job.rb +++ b/app/scheduled_tasks/expire_held_messages_scheduled_task.rb @@ -1,8 +1,8 @@ # frozen_string_literal: true -class ExpireHeldMessagesJob < Postal::Job +class ExpireHeldMessagesScheduledTask < ApplicationScheduledTask - def perform + def call Server.all.each do |server| messages = server.message_db.messages(where: { status: "Held", diff --git a/app/jobs/process_message_retention_job.rb b/app/scheduled_tasks/process_message_retention_scheduled_task.rb similarity index 55% rename from app/jobs/process_message_retention_job.rb rename to app/scheduled_tasks/process_message_retention_scheduled_task.rb index 35c3bf3..671ec40 100644 --- a/app/jobs/process_message_retention_job.rb +++ b/app/scheduled_tasks/process_message_retention_scheduled_task.rb @@ -1,25 +1,29 @@ # frozen_string_literal: true -class ProcessMessageRetentionJob < Postal::Job +class ProcessMessageRetentionScheduledTask < ApplicationScheduledTask def perform Server.all.each do |server| if server.raw_message_retention_days # If the server has a maximum number of retained raw messages, remove any that are older than this - log "Tidying raw messages (by days) for #{server.permalink} (ID: #{server.id}). Keeping #{server.raw_message_retention_days} days." + logger.info "Tidying raw messages (by days) for #{server.permalink} (ID: #{server.id}). Keeping #{server.raw_message_retention_days} days." server.message_db.provisioner.remove_raw_tables_older_than(server.raw_message_retention_days) end if server.raw_message_retention_size - log "Tidying raw messages (by size) for #{server.permalink} (ID: #{server.id}). Keeping #{server.raw_message_retention_size} MB of data." + logger.info "Tidying raw messages (by size) for #{server.permalink} (ID: #{server.id}). Keeping #{server.raw_message_retention_size} MB of data." server.message_db.provisioner.remove_raw_tables_until_less_than_size(server.raw_message_retention_size * 1024 * 1024) end if server.message_retention_days - log "Tidying messages for #{server.permalink} (ID: #{server.id}). Keeping #{server.message_retention_days} days." + logger.info "Tidying messages for #{server.permalink} (ID: #{server.id}). Keeping #{server.message_retention_days} days." server.message_db.provisioner.remove_messages(server.message_retention_days) end end end + def self.next_run_after + three_am + end + end diff --git a/app/scheduled_tasks/prune_suppression_lists_scheduled_task.rb b/app/scheduled_tasks/prune_suppression_lists_scheduled_task.rb new file mode 100644 index 0000000..2f73904 --- /dev/null +++ b/app/scheduled_tasks/prune_suppression_lists_scheduled_task.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class PruneSuppressionListsScheduledTask < ApplicationScheduledTask + + def call + Server.all.each do |s| + logger.info "Pruning suppression lists for server #{s.id}" + s.message_db.suppression_list.prune + end + end + + def self.next_run_after + three_am + end + +end diff --git a/app/scheduled_tasks/prune_webhook_requests_scheduled_task.rb b/app/scheduled_tasks/prune_webhook_requests_scheduled_task.rb new file mode 100644 index 0000000..95147d7 --- /dev/null +++ b/app/scheduled_tasks/prune_webhook_requests_scheduled_task.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class PruneWebhookRequestsScheduledTask < ApplicationScheduledTask + + def call + Server.all.each do |s| + logger.info "Pruning webhook requests for server #{s.id}" + s.message_db.webhooks.prune + end + end + + def self.next_run_after + quarter_to_each_hour + end + +end diff --git a/app/scheduled_tasks/send_notifications_scheduled_task.rb b/app/scheduled_tasks/send_notifications_scheduled_task.rb new file mode 100644 index 0000000..53f3f6d --- /dev/null +++ b/app/scheduled_tasks/send_notifications_scheduled_task.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class SendNotificationsScheduledTask < ApplicationScheduledTask + + def call + Server.send_send_limit_notifications + end + + def self.next_run_after + 1.minute.from_now + end + +end diff --git a/app/services/unqueue_message_service.rb b/app/services/unqueue_message_service.rb new file mode 100644 index 0000000..5433771 --- /dev/null +++ b/app/services/unqueue_message_service.rb @@ -0,0 +1,487 @@ +# frozen_string_literal: true + +class UnqueueMessageService + + def initialize(queued_message:, logger:) + @queued_message = queued_message + @logger = logger + end + + def call + @logger.tagged(original_queued_message: @queued_message.id) do + log "starting message unqueue" + process_original_message + log "finished message unqueue" + end + end + + private + + def process_original_message + begin + @queued_message.message + rescue Postal::MessageDB::Message::NotFound + log "unqueue because backend message has been removed." + @queued_message.destroy + return + end + + unless @queued_message.ready? + log "skipping because message isn't ready for processing" + return + end + + begin + other_messages = @queued_message.batchable_messages(100) + log "found #{other_messages.size} associated messages to process at the same time", batch_key: @queued_message.batch_key + rescue StandardError + @queued_message.unlock + raise + end + + ([@queued_message] + other_messages).each do |queued_message| + @logger.tagged(queued_message: queued_message.id) do + process_message(queued_message) + end + end + ensure + begin + @sender&.finish + rescue StandardError + nil + end + end + + # rubocop:disable Naming/MemoizedInstanceVariableName + def cached_sender(klass, *args) + @sender ||= begin + sender = klass.new(*args) + sender.start + sender + end + end + # rubocop:enable Naming/MemoizedInstanceVariableName + + def log(message, **tags) + @logger.info(message, **tags) + end + + def process_message(queued_message) + begin + queued_message.message + rescue Postal::MessageDB::Message::NotFound + log "unqueueing because backend message has been removed" + queued_message.destroy + return + end + + log "processing message" + + # + # If the server is suspended, hold all messages + # + if queued_message.server.suspended? + log "server is suspended, holding message" + queued_message.message.create_delivery("Held", details: "Mail server has been suspended. No e-mails can be processed at present. Contact support for assistance.") + queued_message.destroy + return + end + + # We might not be able to send this any more, check the attempts + if queued_message.attempts >= Postal.config.general.maximum_delivery_attempts + details = "Maximum number of delivery attempts (#{queued_message.attempts}) has been reached." + if queued_message.message.scope == "incoming" + # Send bounces to incoming e-mails when they are hard failed + if bounce_id = queued_message.send_bounce + details += " Bounce sent to sender (see message )" + end + elsif queued_message.message.scope == "outgoing" + # Add the recipient to the suppression list + if queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, reason: "too many soft fails") + log "added #{queued_message.message.rcpt_to} to suppression list because maximum attempts has been reached" + details += " Added #{queued_message.message.rcpt_to} to suppression list because delivery has failed #{queued_message.attempts} times." + end + end + queued_message.message.create_delivery("HardFail", details: details) + queued_message.destroy + log "message has reached maximum number of attempts, hard failing" + return + end + + # If the raw message has been removed (removed by retention) + unless queued_message.message.raw_message? + log "raw message has been removed, not sending" + queued_message.message.create_delivery("HardFail", details: "Raw message has been removed. Cannot send message.") + queued_message.destroy + return + end + + # + # Handle Incoming Messages + # + if queued_message.message.scope == "incoming" + log "message is incoming" + + # + # If this is a bounce, we need to handle it as such + # + if queued_message.message.bounce + log "message is a bounce" + original_messages = queued_message.message.original_messages + unless original_messages.empty? + queued_message.message.original_messages.each do |orig_msg| + queued_message.message.update(bounce_for_id: orig_msg.id, domain_id: orig_msg.domain_id) + queued_message.message.create_delivery("Processed", details: "This has been detected as a bounce message for .") + orig_msg.bounce!(queued_message.message) + log "bounce linked with message #{orig_msg.id}" + end + queued_message.destroy + return + end + + # This message was sent to the return path but hasn't been matched + # to an original message. If we have a route for this, route it + # otherwise we'll drop at this point. + if queued_message.message.route_id.nil? + log "no source messages found, hard failing" + queued_message.message.create_delivery("HardFail", details: "This message was a bounce but we couldn't link it with any outgoing message and there was no route for it.") + queued_message.destroy + return + end + end + + # + # Update live stats + # + queued_message.message.database.live_stats.increment(queued_message.message.scope) + + # + # Inspect incoming messages + # + unless queued_message.message.inspected + log "inspecting message" + queued_message.message.inspect_message + if queued_message.message.inspected + is_spam = queued_message.message.spam_score > queued_message.server.spam_threshold + queued_message.message.update(spam: true) if is_spam + queued_message.message.append_headers( + "X-Postal-Spam: #{queued_message.message.spam ? 'yes' : 'no'}", + "X-Postal-Spam-Threshold: #{queued_message.server.spam_threshold}", + "X-Postal-Spam-Score: #{queued_message.message.spam_score}", + "X-Postal-Threat: #{queued_message.message.threat ? 'yes' : 'no'}" + ) + log "message inspected, headers added", spam: queued_message.message.spam?, spam_score: queued_message.message.spam_score, threat: queued_message.message.threat? + end + end + + # + # If this message has a SPAM score higher than is permitted + # + if queued_message.message.spam_score >= queued_message.server.spam_failure_threshold + log "message has a spam score higher than the server's maxmimum, hard failing", server_threshold: queued_message.server.spam_failure_threshold + queued_message.message.create_delivery("HardFail", + details: "Message's spam score is higher than the failure threshold for this server. " \ + "Threshold is currently #{queued_message.server.spam_failure_threshold}.") + queued_message.destroy + return + end + + # If the server is in development mode, hold it + if queued_message.server.mode == "Development" && !queued_message.manual? + log "server is in development mode, holding" + queued_message.message.create_delivery("Held", details: "Server is in development mode.") + queued_message.destroy + return + end + + # + # Find out what sort of message we're supposed to be sending and dispatch this request over to + # the sender. + # + if route = queued_message.message.route + + # If the route says we're holding quananteed mail and this is spam, we'll hold this + if route.spam_mode == "Quarantine" && queued_message.message.spam && !queued_message.manual? + log "message is spam and route says to quarantine spam message, holding" + queued_message.message.create_delivery("Held", details: "Message placed into quarantine.") + queued_message.destroy + return + end + + # If the route says we're holding quananteed mail and this is spam, we'll hold this + if route.spam_mode == "Fail" && queued_message.message.spam && !queued_message.manual? + log "message is spam and route says to fail spam message, hard failing" + queued_message.message.create_delivery("HardFail", details: "Message is spam and the route specifies it should be failed.") + queued_message.destroy + return + end + + # + # Messages that should be blindly accepted are blindly accepted + # + if route.mode == "Accept" + log "route says to accept without endpoint, marking as processed" + queued_message.message.create_delivery("Processed", details: "Message has been accepted but not sent to any endpoints.") + queued_message.destroy + return + end + + # + # Messages that should be accepted and held should be held + # + if route.mode == "Hold" + if queued_message.manual? + log "route says to hold and message was queued manually, marking as processed" + queued_message.message.create_delivery("Processed", details: "Message has been processed.") + else + log "route says to hold, marking as held" + queued_message.message.create_delivery("Held", details: "Message has been accepted but not sent to any endpoints.") + end + queued_message.destroy + return + end + + # + # Messages that should be bounced should be bounced (or rejected if they got this far) + # + if route.mode == "Bounce" || route.mode == "Reject" + log "route says to bounce, hard failing and sending bounce" + if id = queued_message.send_bounce + log "bounce sent with id #{id}" + queued_message.message.create_delivery("HardFail", details: "Message has been bounced because the route asks for this. See message ") + end + queued_message.destroy + return + end + + if @fixed_result + result = @fixed_result + else + case queued_message.message.endpoint + when SMTPEndpoint + sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, nil, servers: [queued_message.message.endpoint]) + when HTTPEndpoint + sender = cached_sender(Postal::HTTPSender, queued_message.message.endpoint) + when AddressEndpoint + sender = cached_sender(Postal::SMTPSender, queued_message.message.endpoint.domain, nil, force_rcpt_to: queued_message.message.endpoint.address) + else + log "invalid endpoint for route (#{queued_message.message.endpoint_type})" + queued_message.message.create_delivery("HardFail", details: "Invalid endpoint for route.") + queued_message.destroy + return + end + result = sender.send_message(queued_message.message) + if result.connect_error + @fixed_result = result + end + end + + # Log the result + log_details = result.details + if result.type == "HardFail" && result.suppress_bounce + # The delivery hard failed, but requested that no bounce be sent + log "suppressing bounce message after hard fail" + elsif result.type == "HardFail" && queued_message.message.send_bounces? + # If the message is a hard fail, send a bounce message for this message. + log "sending a bounce because message hard failed" + if bounce_id = queued_message.send_bounce + log_details += ". " unless log_details =~ /\.\z/ + log_details += " Sent bounce message to sender (see message )" + end + end + + queued_message.message.create_delivery(result.type, details: log_details, output: result.output&.strip, sent_with_ssl: result.secure, log_id: result.log_id, time: result.time) + + if result.retry + queued_message.retry_later(result.retry.is_a?(Integer) ? result.retry : nil) + log "message requeued for trying later, at #{queued_message.retry_after}" + queued_message.allocate_ip_address + queued_message.update_column(:ip_address_id, queued_message.ip_address&.id) + else + log "message processing completed" + queued_message.message.endpoint.mark_as_used + queued_message.destroy + end + else + log "no route and/or endpoint available for processing, hard failing" + queued_message.message.create_delivery("HardFail", details: "Message does not have a route and/or endpoint available for delivery.") + queued_message.destroy + return + end + end + + # + # Handle Outgoing Messages + # + return unless queued_message.message.scope == "outgoing" + + log "message is outgoing" + + if queued_message.message.domain.nil? + log "message has no domain, hard failing" + queued_message.message.create_delivery("HardFail", details: "Message's domain no longer exist") + queued_message.destroy + return + end + + # + # If there's no to address, we can't do much. Fail it. + # + if queued_message.message.rcpt_to.blank? + log "message has no 'to' address, hard failing" + queued_message.message.create_delivery("HardFail", details: "Message doesn't have an RCPT to") + queued_message.destroy + return + end + + # Extract a tag and add it to the message if one doesn't exist + if queued_message.message.tag.nil? && tag = queued_message.message.headers["x-postal-tag"] + log "added tag: #{tag.last}" + queued_message.message.update(tag: tag.last) + end + + # + # If the credentials for this message is marked as holding and this isn't manual, hold it + # + if !queued_message.manual? && queued_message.message.credential && queued_message.message.credential.hold? + log "credential wants us to hold messages, holding" + queued_message.message.create_delivery("Held", details: "Credential is configured to hold all messages authenticated by it.") + queued_message.destroy + return + end + + # + # If the recipient is on the suppression list and this isn't a manual queueing block sending + # + if !queued_message.manual? && sl = queued_message.server.message_db.suppression_list.get(:recipient, queued_message.message.rcpt_to) + log "recipient is on the suppression list, holding" + queued_message.message.create_delivery("Held", details: "Recipient (#{queued_message.message.rcpt_to}) is on the suppression list (reason: #{sl['reason']})") + queued_message.destroy + return + end + + # Parse the content of the message as appropriate + if queued_message.message.should_parse? + log "parsing message content as it hasn't been parsed before" + queued_message.message.parse_content + end + + # Inspect outgoing messages when there's a threshold set for the server + if !queued_message.message.inspected && queued_message.server.outbound_spam_threshold + log "inspecting message" + queued_message.message.inspect_message + if queued_message.message.inspected + if queued_message.message.spam_score >= queued_message.server.outbound_spam_threshold + queued_message.message.update(spam: true) + end + log "message inspected successfully", spam: queued_message.message.spam?, spam_score: queued_message.message.spam_score + end + end + + if queued_message.message.spam + log "message is spam (#{queued_message.message.spam_score}), hard failing", server_threshold: queued_message.server.outbound_spam_threshold + queued_message.message.create_delivery("HardFail", + details: "Message is likely spam. Threshold is #{queued_message.server.outbound_spam_threshold} and " \ + "the message scored #{queued_message.message.spam_score}.") + queued_message.destroy + return + end + + # Add outgoing headers + unless queued_message.message.has_outgoing_headers? + queued_message.message.add_outgoing_headers + end + + # Check send limits + if queued_message.server.send_limit_exceeded? + # If we're over the limit, we're going to be holding this message + log "server send limit has been exceeded, holding", send_limit: queued_message.server.send_limit + queued_message.server.update_columns(send_limit_exceeded_at: Time.now, send_limit_approaching_at: nil) + queued_message.message.create_delivery("Held", details: "Message held because send limit (#{queued_message.server.send_limit}) has been reached.") + queued_message.destroy + return + elsif queued_message.server.send_limit_approaching? + # If we're approaching the limit, just say we are but continue to process the message + queued_message.server.update_columns(send_limit_approaching_at: Time.now, send_limit_exceeded_at: nil) + else + queued_message.server.update_columns(send_limit_approaching_at: nil, send_limit_exceeded_at: nil) + end + + # Update the live stats for this message. + queued_message.message.database.live_stats.increment(queued_message.message.scope) + + # If the server is in development mode, hold it + if queued_message.server.mode == "Development" && !queued_message.manual? + log "server is in development mode, holding" + queued_message.message.create_delivery("Held", details: "Server is in development mode.") + queued_message.destroy + return + end + + # Send the outgoing message to the SMTP sender + + if @fixed_result + result = @fixed_result + else + sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, queued_message.ip_address) + result = sender.send_message(queued_message.message) + if result.connect_error + @fixed_result = result + end + end + + # + # If the message has been hard failed, check to see how many other recent hard fails we've had for the address + # and if there are more than 2, suppress the address for 30 days. + # + if result.type == "HardFail" + recent_hard_fails = queued_message.server.message_db.select(:messages, + where: { + rcpt_to: queued_message.message.rcpt_to, + status: "HardFail", + timestamp: { greater_than: 24.hours.ago.to_f } + }, + count: true) + if recent_hard_fails >= 1 && queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, reason: "too many hard fails") + log "Added #{queued_message.message.rcpt_to} to suppression list because #{recent_hard_fails} hard fails in 24 hours" + result.details += "." if result.details =~ /\.\z/ + result.details += " Recipient added to suppression list (too many hard fails)." + end + end + + # + # If a message is sent successfully, remove the users from the suppression list + # + if result.type == "Sent" && queued_message.server.message_db.suppression_list.remove(:recipient, queued_message.message.rcpt_to) + log "removed #{queued_message.message.rcpt_to} from suppression list" + result.details += "." if result.details =~ /\.\z/ + result.details += " Recipient removed from suppression list." + end + + # Log the result + queued_message.message.create_delivery(result.type, details: result.details, output: result.output, sent_with_ssl: result.secure, log_id: result.log_id, time: result.time) + if result.retry + queued_message.retry_later(result.retry.is_a?(Integer) ? result.retry : nil) + log "message requeued for trying later", retry_after: queued_message.retry_after + else + log "message processing complete" + queued_message.destroy + end + rescue StandardError => e + log "internal error: #{e.class}: #{e.message}" + e.backtrace.each { |line| log(line) } + + queued_message.retry_later + log "message requeued for trying later, at #{queued_message.retry_after}" + + if defined?(Sentry) + Sentry.capture_exception(e, extra: { server_id: queued_message.server_id, queued_message_id: queued_message.message_id }) + end + queued_message.message&.create_delivery("Error", + details: "An internal error occurred while sending " \ + "this message. This message will be retried " \ + "automatically.", + output: "#{e.class}: #{e.message}", log_id: "J-#{id}") + end + +end diff --git a/app/services/webhook_delivery_service.rb b/app/services/webhook_delivery_service.rb new file mode 100644 index 0000000..617054e --- /dev/null +++ b/app/services/webhook_delivery_service.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class WebhookDeliveryService + + def initialize(webhook_delivery:) + @webhook_delivery = webhook_delivery + end + + # TODO: move the logic from WebhookDelivery#deliver in to this service. + # + def call + if @webhook_delivery.deliver + log "Succesfully delivered" + else + log "Delivery failed" + end + end + +end diff --git a/bin/postal b/bin/postal index b113e2d..34a6d22 100755 --- a/bin/postal +++ b/bin/postal @@ -16,21 +16,13 @@ case "$1" in ;; smtp-server) - run "bundle exec rake postal:smtp_server" + run "bundle exec ruby script/smtp_server.rb" ;; worker) run "bundle exec ruby script/worker.rb" ;; - cron) - run "bundle exec rake postal:cron" - ;; - - requeuer) - run "bundle exec rake postal:requeuer" - ;; - initialize) echo 'Initializing database' run "bundle exec rake db:create db:schema:load db:seed" @@ -69,8 +61,6 @@ case "$1" in echo -e " * \033[35mweb-server\033[0m - run the web server" echo -e " * \033[35msmtp-server\033[0m - run the SMTP server" echo -e " * \033[35mworker\033[0m - run a worker" - echo -e " * \033[35mcron\033[0m - run the cron process" - echo -e " * \033[35mrequeuer\033[0m - run the message requeuer" echo echo "Setup/upgrade tools:" echo diff --git a/config/cron.rb b/config/cron.rb deleted file mode 100644 index fed7a60..0000000 --- a/config/cron.rb +++ /dev/null @@ -1,30 +0,0 @@ -# frozen_string_literal: true - -module Clockwork - - configure do |config| - config[:tz] = "UTC" - config[:logger] = Postal.logger - end - - every 1.minute, "every-1-minutes" do - RequeueWebhooksJob.queue(:main) - SendNotificationsJob.queue(:main) - end - - every 1.hour, "every-hour", at: ["**:15"] do - CheckAllDNSJob.queue(:main) - ExpireHeldMessagesJob.queue(:main) - CleanupAuthieSessionsJob.queue(:main) - end - - every 1.hour, "every-hour", at: ["**:45"] do - PruneWebhookRequestsJob.queue(:main) - end - - every 1.day, "every-day", at: ["03:00"] do - ProcessMessageRetentionJob.queue(:main) - PruneSuppressionListsJob.queue(:main) - end - -end diff --git a/config/initializers/logging.rb b/config/initializers/logging.rb index 5108da4..22e16ab 100644 --- a/config/initializers/logging.rb +++ b/config/initializers/logging.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + begin def add_exception_to_payload(payload, event) return unless exception = event.payload[:exception_object] diff --git a/config/postal.defaults.yml b/config/postal.defaults.yml index 2c9fa33..6574a0d 100644 --- a/config/postal.defaults.yml +++ b/config/postal.defaults.yml @@ -40,16 +40,6 @@ message_db: password: <%= ENV.fetch('MESSAGE_DB_PASSWORD', '') %> prefix: <%= ENV.fetch('MESSAGE_DB_PREFIX', 'postal') %> -rabbitmq: - host: <%= ENV.fetch('RABBITMQ_HOST', '127.0.0.1') %> - port: <%= ENV.fetch('RABBITMQ_PORT', '5672') %> - username: <%= ENV.fetch('RABBITMQ_USERNAME', 'postal') %> - password: <%= ENV.fetch('RABBITMQ_PASSWORD', '') %> - vhost: <%= ENV.fetch('RABBITMQ_VHOST', '/postal') %> - tls: <%= ENV.fetch('RABBITMQ_TLS', 'false') %> - verify_peer: <%= ENV.fetch('RABBITMQ_VERIFY_PEER', 'true') %> - tls_ca_certificates: <%= ENV.fetch('RABBITMQ_TLS_CA_CERTIFICATES', '/etc/ssl/certs/ca-certificates.crt'.split(',').inspect) %> - logging: rails_log: <%= ENV.fetch('LOGGING_RAILS_LOG', 'false') %> graylog: @@ -57,9 +47,6 @@ logging: port: <%= ENV.fetch('GRAYLOG_PORT', '12201') %> facility: <%= ENV.fetch('GRAYLOG_FACILITY', 'postal') %> -workers: - threads: <%= ENV.fetch('WORKER_THREADS', '4') %> - smtp_server: port: <%= ENV.fetch('SMTP_SERVER_PORT', '25') %> bind_address: "<%= ENV.fetch('SMTP_SERVER_BIND_ADDRESS', '::') %>" diff --git a/db/migrate/20240213165450_create_worker_roles.rb b/db/migrate/20240213165450_create_worker_roles.rb new file mode 100644 index 0000000..4b62843 --- /dev/null +++ b/db/migrate/20240213165450_create_worker_roles.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +class CreateWorkerRoles < ActiveRecord::Migration[6.1] + + def change + create_table :worker_roles do |t| + t.string :role + t.string :worker + t.datetime :acquired_at + t.index :role, unique: true + end + end + +end diff --git a/db/migrate/20240213171830_create_scheduled_tasks.rb b/db/migrate/20240213171830_create_scheduled_tasks.rb new file mode 100644 index 0000000..fb18b5e --- /dev/null +++ b/db/migrate/20240213171830_create_scheduled_tasks.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class CreateScheduledTasks < ActiveRecord::Migration[6.1] + + def change + create_table :scheduled_tasks do |t| + t.string :name + t.datetime :next_run_after + t.index :name, unique: true + end + end + +end diff --git a/db/migrate/20240214132253_add_lock_fields_to_webhook_requests.rb b/db/migrate/20240214132253_add_lock_fields_to_webhook_requests.rb new file mode 100644 index 0000000..edf5662 --- /dev/null +++ b/db/migrate/20240214132253_add_lock_fields_to_webhook_requests.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class AddLockFieldsToWebhookRequests < ActiveRecord::Migration[6.1] + + def change + add_column :webhook_requests, :locked_by, :string + add_column :webhook_requests, :locked_at, :datetime + + add_index :webhook_requests, :locked_by + end + +end diff --git a/db/schema.rb b/db/schema.rb index a1b9e76..063f8fa 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_02_06_173036) do +ActiveRecord::Schema.define(version: 2024_02_14_132253) do create_table "additional_route_endpoints", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| t.integer "route_id" @@ -213,6 +213,12 @@ ActiveRecord::Schema.define(version: 2024_02_06_173036) do t.index ["token"], name: "index_routes_on_token", length: 6 end + create_table "scheduled_tasks", charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| + t.string "name" + t.datetime "next_run_after" + t.index ["name"], name: "index_scheduled_tasks_on_name", unique: true + end + create_table "servers", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| t.integer "organization_id" t.string "uuid" @@ -343,6 +349,9 @@ ActiveRecord::Schema.define(version: 2024_02_06_173036) do t.datetime "retry_after", precision: 6 t.text "error" t.datetime "created_at", precision: 6 + t.string "locked_by" + t.datetime "locked_at" + t.index ["locked_by"], name: "index_webhook_requests_on_locked_by" end create_table "webhooks", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| @@ -359,4 +368,11 @@ ActiveRecord::Schema.define(version: 2024_02_06_173036) do t.index ["server_id"], name: "index_webhooks_on_server_id" end + create_table "worker_roles", charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| + t.string "role" + t.string "worker" + t.datetime "acquired_at" + t.index ["role"], name: "index_worker_roles_on_role", unique: true + end + end diff --git a/docker-compose.yml b/docker-compose.yml index f3d8670..e3cb991 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,6 @@ services: image: ${POSTAL_IMAGE} depends_on: - mariadb - - rabbitmq entrypoint: ["/docker-entrypoint.sh"] volumes: - "./docker/ci-config:/config" @@ -14,7 +13,6 @@ services: WAIT_FOR_TIMEOUT: 90 WAIT_FOR_TARGETS: |- mariadb:3306 - rabbitmq:5672 mariadb: image: mariadb @@ -23,7 +21,3 @@ services: MARIADB_DATABASE: postal MARIADB_ALLOW_EMPTY_PASSWORD: 'yes' MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 'yes' - - rabbitmq: - image: rabbitmq:3 - restart: always diff --git a/docker/ci-config/postal.test.yml b/docker/ci-config/postal.test.yml index 69922c9..65dfef5 100644 --- a/docker/ci-config/postal.test.yml +++ b/docker/ci-config/postal.test.yml @@ -20,12 +20,6 @@ message_db: password: prefix: postal -rabbitmq: - host: rabbitmq - username: guest - password: guest - vhost: null - dns: mx_records: - mx.postal.example.com diff --git a/lib/postal/config.rb b/lib/postal/config.rb index 1f1f42d..6907d9e 100644 --- a/lib/postal/config.rb +++ b/lib/postal/config.rb @@ -85,6 +85,9 @@ module Postal end end + # Return a generic logger for use generally throughout Postal. + # + # @return [Klogger::Logger] A logger instance def self.logger @logger ||= begin k = Klogger.new(nil, destination: Rails.env.test? ? "/dev/null" : $stdout, highlight: Rails.env.development?) @@ -106,9 +109,14 @@ module Postal def self.locker_name string = process_name.dup string += " job:#{Thread.current[:job_id]}" if Thread.current[:job_id] + string += " thread:#{Thread.current.native_thread_id}" string end + def self.locker_name_with_suffix(suffix) + "#{locker_name} #{suffix}" + end + def self.smtp_from_name config.smtp&.from_name || "Postal" end @@ -175,7 +183,7 @@ module Postal end def self.graylog_logging_destination - @graylog_destination ||= begin + @graylog_logging_destination ||= begin notifier = GELF::Notifier.new(config.logging.graylog.host, config.logging.graylog.port, "WAN") proc do |_logger, payload, group_ids| short_message = payload.delete(:message) || "[message missing]" diff --git a/lib/postal/job.rb b/lib/postal/job.rb deleted file mode 100644 index 1311d1f..0000000 --- a/lib/postal/job.rb +++ /dev/null @@ -1,44 +0,0 @@ -# frozen_string_literal: true - -require "nifty/utils/random_string" - -module Postal - class Job - - def initialize(id, params = {}) - @id = id - @params = params - on_initialize - end - - attr_reader :id - - def params - @params || {} - end - - def on_initialize - # Called whenever the class is initialized. Can be overriden. - end - - def on_error(exception) - # Called if there's an exception while processing the perform block. - # Receives the exception. - end - - def perform - end - - def log(text) - Worker.logger.info(text) - end - - def self.queue(queue, params = {}) - job_id = Nifty::Utils::RandomString.generate(length: 10).upcase - job_payload = { "params" => params, "class_name" => name, "id" => job_id, "queue" => queue } - Postal::Worker.job_queue(queue).publish(job_payload.to_json, persistent: false) - job_id - end - - end -end diff --git a/lib/postal/message_db/message.rb b/lib/postal/message_db/message.rb index db70c3b..46df1c9 100644 --- a/lib/postal/message_db/message.rb +++ b/lib/postal/message_db/message.rb @@ -445,7 +445,11 @@ module Postal # def bounce!(bounce_message) create_delivery("Bounced", details: "We've received a bounce message for this e-mail. See for details.") - SendWebhookJob.queue(:main, server_id: database.server_id, event: "MessageBounced", payload: { _original_message: id, _bounce: bounce_message.id }) + + WebhookRequest.trigger(server, "MessageBounced", { + original_message: webhook_hash, + bounce: bounce_message.webhook_hash + }) end # @@ -461,7 +465,12 @@ module Postal def create_load(request) update("loaded" => Time.now.to_f) if loaded.nil? database.insert(:loads, { message_id: id, ip_address: request.ip, user_agent: request.user_agent, timestamp: Time.now.to_f }) - SendWebhookJob.queue(:main, server_id: database.server_id, event: "MessageLoaded", payload: { _message: id, ip_address: request.ip, user_agent: request.user_agent }) + + WebhookRequest.trigger(server, "MessageLoaded", { + message: webhook_hash, + ip_address: request.ip, + user_agent: request.user_agent + }) end # diff --git a/lib/postal/message_requeuer.rb b/lib/postal/message_requeuer.rb deleted file mode 100644 index 31e5510..0000000 --- a/lib/postal/message_requeuer.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -module Postal - class MessageRequeuer - - def run - Signal.trap("INT") { @running ? @exit = true : Process.exit(0) } - Signal.trap("TERM") { @running ? @exit = true : Process.exit(0) } - - log "Running message requeuer..." - loop do - @running = true - QueuedMessage.requeue_all - @running = false - check_exit - sleep 5 - end - end - - private - - def log(text) - Postal.logger.info text, component: "message-requeuer" - end - - def check_exit - return unless @exit - - log "Exiting" - Process.exit(0) - end - - end -end diff --git a/lib/postal/rabbit_mq.rb b/lib/postal/rabbit_mq.rb deleted file mode 100644 index 2703cd5..0000000 --- a/lib/postal/rabbit_mq.rb +++ /dev/null @@ -1,38 +0,0 @@ -# frozen_string_literal: true - -require "postal/config" -require "bunny" - -module Postal - module RabbitMQ - - def self.create_connection - bunny_host = ["localhost"] - - if Postal.config.rabbitmq&.host.is_a?(Array) - bunny_host = Postal.config.rabbitmq&.host - elsif Postal.config.rabbitmq&.host.is_a?(String) - bunny_host = [Postal.config.rabbitmq&.host] - end - - conn = Bunny.new( - hosts: bunny_host, - port: Postal.config.rabbitmq&.port || 5672, - tls: Postal.config.rabbitmq&.tls || false, - verify_peer: Postal.config.rabbitmq&.verify_peer || true, - tls_ca_certificates: Postal.config.rabbitmq&.tls_ca_certificates || ["/etc/ssl/certs/ca-certificates.crt"], - username: Postal.config.rabbitmq&.username || "guest", - password: Postal.config.rabbitmq&.password || "guest", - vhost: Postal.config.rabbitmq&.vhost || nil - ) - conn.start - conn - end - - def self.create_channel - conn = create_connection - conn.create_channel(nil, Postal.config.workers.threads) - end - - end -end diff --git a/lib/postal/tracking_middleware.rb b/lib/postal/tracking_middleware.rb index 82a25a0..1593af9 100644 --- a/lib/postal/tracking_middleware.rb +++ b/lib/postal/tracking_middleware.rb @@ -94,16 +94,20 @@ module Postal user_agent: request.user_agent, timestamp: time }) - SendWebhookJob.queue(:main, - server_id: message_db.server_id, - event: "MessageLinkClicked", - payload: { - _message: link["message_id"], - url: link["url"], - token: link["token"], - ip_address: request.ip, - user_agent: request.user_agent - }) + + begin + message_webhook_hash = message_db.message(link["message_id"]).webhook_hash + WebhookRequest.trigger(message_db.server, "MessageLinkClicked", { + message: message_webhook_hash, + url: link["url"], + token: link["token"], + ip_address: request.ip, + user_agent: request.user_agent + }) + rescue Postal::MessageDB::Message::NotFound + # If we can't find the message that this link is associated with, we'll just ignore it + # and not trigger any webhooks. + end end [307, { "Location" => link["url"] }, ["Redirected to: #{link['url']}"]] diff --git a/lib/postal/worker.rb b/lib/postal/worker.rb deleted file mode 100644 index 969d81e..0000000 --- a/lib/postal/worker.rb +++ /dev/null @@ -1,220 +0,0 @@ -# frozen_string_literal: true - -module Postal - class Worker - - def initialize(queues) - @initial_queues = queues - @active_queues = {} - @process_name = $0 - @running_jobs = [] - end - - def work - logger.info "Worker running with #{Postal.config.workers.threads} threads" - - Signal.trap("INT") do - @exit = true - set_process_name - end - Signal.trap("TERM") do - @exit = true - set_process_name - end - - self.class.job_channel.prefetch(Postal.config.workers.threads) - @initial_queues.each { |queue| join_queue(queue) } - - exit_checks = 0 - loop do - if @exit && @running_jobs.empty? - logger.info "Exiting immediately because no jobs running" - exit 0 - elsif @exit - if exit_checks >= 60 - logger.info "Job did not finish in a timely manner. Exiting" - exit 0 - end - if exit_checks.zero? - logger.info "Exit requested but job is running. Waiting for job to finish." - end - sleep 60 - exit_checks += 1 - else - manage_ip_queues - sleep 1 - end - end - end - - private - - def receive_job(delivery_info, properties, message) - if message && message["class_name"] - @running_jobs << message["id"] - set_process_name - start_time = Time.now - Thread.current[:job_id] = message["id"] - logger.info "Processing job" - begin - klass = message["class_name"].constantize.new(message["id"], message["params"]) - klass.perform - GC.start - rescue StandardError => e - klass.on_error(e) if defined?(klass) - logger.exception(e) - if defined?(Sentry) - Sentry.capture_exception(e, extra: { job_id: message["id"] }) - end - ensure - logger.info "Finished job", time: (Time.now - start_time).to_i - end - end - ensure - Thread.current[:job_id] = nil - self.class.job_channel.ack(delivery_info.delivery_tag) - @running_jobs.delete(message["id"]) if message["id"] - set_process_name - - if @exit && @running_jobs.empty? - logger.info "Exiting because all jobs have finished." - exit 0 - end - end - - def join_queue(queue) - if @active_queues[queue] - logger.error "attempted to join queue but already joined", queue: queue - else - consumer = self.class.job_queue(queue).subscribe(manual_ack: true) do |delivery_info, properties, body| - message = begin - JSON.parse(body) - rescue StandardError - nil - end - - logger.tagged(job_id: message["id"], queue: queue, job_class: message["class_name"]) do - receive_job(delivery_info, properties, message) - end - end - @active_queues[queue] = consumer - logger.info "joined queue", queue: queue - end - end - - def leave_queue(queue) - if consumer = @active_queues[queue] - consumer.cancel - @active_queues.delete(queue) - logger.info "left queue", queue: queue - else - logger.error "requested to leave queue, but not joined", queue: queue - end - end - - def manage_ip_queues - @ip_queues ||= [] - @ip_to_id_mapping ||= {} - @unassigned_ips ||= [] - @pairs ||= {} - @counter ||= 0 - - if @counter >= 15 - @ip_to_id_mapping = {} - @unassigned_ips = [] - @counter = 0 - else - @counter += 1 - end - - # Get all IP addresses on the system - current_ip_addresses = Socket.ip_address_list.map(&:ip_address) - - # Map them to an actual ID in the database if we can and cache that - needed_ip_ids = [] - current_ip_addresses.each do |ip| - need = nil - if id = @ip_to_id_mapping[ip] - # We know this IPs ID, we'll just use that. - need = id - elsif @unassigned_ips.include?(ip) - # We know this IP isn't valid. We don't need to do anything - elsif !self.class.local_ip?(ip) && ip_address = IPAddress.where("ipv4 = ? OR ipv6 = ?", ip, ip).first - # We need to look this up - @pairs[ip_address.ipv4] = ip_address.ipv6 - @ip_to_id_mapping[ip] = ip_address.id - need = ip_address.id - else - @unassigned_ips << ip - end - - next unless need - - pair = @pairs[ip] || @pairs.key(ip) - if pair.nil? || current_ip_addresses.include?(pair) - needed_ip_ids << @ip_to_id_mapping[ip] - else - logger.info "Host has '#{ip}' but its pair (#{pair}) isn't here. Cannot add now." - end - end - - # Make an array of needed queue names - # Work out what we need to actually do here - missing_queues = needed_ip_ids - @ip_queues - unwanted_queues = @ip_queues - needed_ip_ids - # Leave the queues we don't want any more - unwanted_queues.each do |id| - leave_queue("outgoing-#{id}") - @ip_queues.delete(id) - ip_addresses_to_clear = [] - @ip_to_id_mapping.each do |iip, iid| - if id == iid - ip_addresses_to_clear << iip - end - end - ip_addresses_to_clear.each { |ip| @ip_to_id_mapping.delete(ip) } - end - # Join any missing queues - missing_queues.uniq.each do |id| - join_queue("outgoing-#{id}") - @ip_queues << id - end - end - - def set_process_name - prefix = @process_name.to_s - prefix += " [exiting]" if @exit - if @running_jobs.empty? - $0 = "#{prefix} (idle)" - else - $0 = "#{prefix} (running #{@running_jobs.join(', ')})" - end - end - - def logger - self.class.logger - end - - class << self - - def logger - Postal.logger - end - - def job_channel - @job_channel ||= Postal::RabbitMQ.create_channel - end - - def job_queue(name) - @job_queues ||= {} - @job_queues[name] ||= job_channel.queue("deliver-jobs-#{name}", durable: true, arguments: { "x-message-ttl" => 60_000 }) - end - - def local_ip?(ip) - !!(ip =~ /\A(127\.|fe80:|::)/) - end - - end - - end -end diff --git a/lib/tasks/postal.rake b/lib/tasks/postal.rake index 81770f6..b0b363c 100644 --- a/lib/tasks/postal.rake +++ b/lib/tasks/postal.rake @@ -1,27 +1,6 @@ # frozen_string_literal: true namespace :postal do - desc "Start the cron worker" - task cron: :environment do - require "clockwork" - require Rails.root.join("config", "cron") - trap("TERM") do - puts "Exiting..." - Process.exit(0) - end - Clockwork.run - end - - desc "Start SMTP Server" - task smtp_server: :environment do - Postal::SMTPServer::Server.new(debug: true).run - end - - desc "Start the message requeuer" - task requeuer: :environment do - Postal::MessageRequeuer.new.run - end - desc "Run all migrations on message databases" task migrate_message_databases: :environment do Server.all.each do |server| diff --git a/lib/worker/jobs/base_job.rb b/lib/worker/jobs/base_job.rb new file mode 100644 index 0000000..f324b51 --- /dev/null +++ b/lib/worker/jobs/base_job.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Worker + module Jobs + class BaseJob + + def initialize(logger:) + @logger = logger + end + + def call + # Override me. + end + + def work_completed? + @work_completed == true + end + + private + + def work_completed! + @work_completed = true + end + + attr_reader :logger + + end + end +end diff --git a/lib/worker/jobs/process_queued_messages_job.rb b/lib/worker/jobs/process_queued_messages_job.rb new file mode 100644 index 0000000..60c8ba3 --- /dev/null +++ b/lib/worker/jobs/process_queued_messages_job.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +module Worker + module Jobs + class ProcessQueuedMessagesJob < BaseJob + + def call + @lock_time = Time.current + @locker = Postal.locker_name_with_suffix(SecureRandom.hex(8)) + + find_ip_addresses + lock_message_for_processing + obtain_locked_messages + process_messages + @messages_to_process + end + + private + + # Returns an array of IP address IDs that are present on the host that is + # running this job. + # + # @return [Array] + def find_ip_addresses + ip_addresses = { 4 => [], 6 => [] } + Socket.ip_address_list.each do |address| + next if local_ip?(address.ip_address) + + ip_addresses[address.ipv4? ? 4 : 6] << address.ip_address + end + @ip_addresses = IPAddress.where(ipv4: ip_addresses[4]).or(IPAddress.where(ipv6: ip_addresses[6])).pluck(:id) + end + + # Is the given IP address a local address? + # + # @param [String] ip + # @return [Boolean] + def local_ip?(ip) + !!(ip =~ /\A(127\.|fe80:|::)/) + end + + # Obtain a queued message from the database for processing + # + # @return [void] + def lock_message_for_processing + QueuedMessage.where(ip_address_id: [nil, @ip_addresses]) + .where(locked_by: nil, locked_at: nil) + .ready_with_delayed_retry + .limit(1) + .update_all(locked_by: @locker, locked_at: @lock_time) + end + + # Get a full list of all messages which we can process (i.e. those which have just + # been locked by us for processing) + # + # @return [void] + def obtain_locked_messages + @messages_to_process = QueuedMessage.where(locked_by: @locker, locked_at: @lock_time) + end + + # Process the messages we obtained from the database + # + # @return [void] + def process_messages + @messages_to_process.each do |message| + work_completed! + UnqueueMessageService.new(queued_message: message, logger: logger).call + end + end + + end + end +end diff --git a/lib/worker/jobs/process_webhook_requests_job.rb b/lib/worker/jobs/process_webhook_requests_job.rb new file mode 100644 index 0000000..c33de5f --- /dev/null +++ b/lib/worker/jobs/process_webhook_requests_job.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Worker + module Jobs + class ProcessWebhookRequestsJob < BaseJob + + def call + @lock_time = Time.current + @locker = Postal.locker_name_with_suffix(SecureRandom.hex(8)) + + lock_request_for_processing + obtain_locked_requests + process_requests + end + + private + + # Obtain a webhook request from the database for processing + # + # @return [void] + def lock_request_for_processing + WebhookRequest.unlocked + .ready + .limit(1) + .update_all(locked_by: @locker, locked_at: @lock_time) + end + + # Get a full list of all webhooks which we can process (i.e. those which have just + # been locked by us for processing) + # + # @return [void] + def obtain_locked_requests + @requests_to_process = WebhookRequest.where(locked_by: @locker, locked_at: @lock_time) + end + + # Process the webhook requests we obtained from the database + # + # @return [void] + def process_requests + @requests_to_process.each do |request| + work_completed! + request.deliver + end + end + + end + end +end diff --git a/lib/worker/process.rb b/lib/worker/process.rb new file mode 100644 index 0000000..34eaec2 --- /dev/null +++ b/lib/worker/process.rb @@ -0,0 +1,242 @@ +# frozen_string_literal: true + +module Worker + # The Postal Worker process is responsible for handling all background tasks. This includes processing of all + # messages, webhooks and other administrative tasks. There are two main types of background work which is completed, + # jobs and scheduled tasks. + # + # The 'Jobs' here allow for the continuous monitoring of a database table (or queue) and processing of any new items + # which may appear in that. The polling takes place every 5 seconds by default and the work is able to run multiple + # threads to look for and process this work. + # + # Scheduled Tasks allow for code to be executed on a ROUGH schedule. This is used for administrative tasks. A single + # thread will run within each worker process and attempt to acquire the 'tasks' role. If successful it will run all + # tasks which are due to be run. The tasks are then scheduled to run again at a future time. Workers which are not + # successful in acquiring the role will not run any tasks but will still attempt to acquire a lock in case the current + # acquiree disappears. + # + # The worker process will run until it receives a TERM or INT signal. It will then attempt to gracefully shut down + # after it has completed any outstanding jobs which are already inflight. + class Process + + # An array of job classes that should be processed each time the worker ticks. + # + # @return [Array] + JOBS = [ + Jobs::ProcessQueuedMessagesJob, + Jobs::ProcessWebhookRequestsJob + ].freeze + + # An array of tasks that should be processed + # + # @return [Array] + TASKS = [ + ActionDeletionsScheduledTask, + CheckAllDNSScheduledTask, + CleanupAuthieSessionsScheduledTask, + ExpireHeldMessagesScheduledTask, + ProcessMessageRetentionScheduledTask, + PruneSuppressionListsScheduledTask, + PruneWebhookRequestsScheduledTask, + SendNotificationsScheduledTask + ].freeze + + # @param [Integer] thread_count The number of worker threads to run in this process + def initialize(thread_count: 2, work_sleep_time: 5, task_sleep_time: 60) + @thread_count = thread_count + @exit_pipe_read, @exit_pipe_write = IO.pipe + @work_sleep_time = work_sleep_time + @task_sleep_time = task_sleep_time + @threads = [] + end + + def run + logger.tagged(component: "worker") do + setup_traps + start_work_threads + start_tasks_thread + wait_for_threads + end + end + + private + + # Install signal traps to allow for graceful shutdown + # + # @return [void] + def setup_traps + trap("INT") { receive_signal("INT") } + trap("TERM") { receive_signal("TERM") } + end + + # Receive a signal and set the shutdown flag + # + # @param [String] signal The signal that was received z + # @return [void] + def receive_signal(signal) + puts "Received #{signal} signal. Stopping when able." + @shutdown = true + @exit_pipe_write.close + end + + # Wait for the period of time and return true or false if shutdown has been requested. If the shutdown is + # requested during the wait, it will return immediately otherwise it will return false when it has finished + # waiting for the period of time. + # + # @param [Integer] wait_time The time to wait for + # @return [Boolean] + def shutdown_after_wait?(wait_time) + @exit_pipe_read.wait_readable(wait_time) ? true : false + end + + # Wait for all threads to complete + # + # @return [void] + def wait_for_threads + @threads.each(&:join) + end + + # Start the worker threads + # + # @return [void] + def start_work_threads + logger.info "starting #{@thread_count} work threads" + @thread_count.times do |index| + start_work_thread(index) + end + end + + # Start a worker thread + # + # @return [void] + def start_work_thread(index) + @threads << Thread.new do + logger.tagged(component: "worker", thread: "work#{index}") do + logger.info "started work thread #{index}" + loop do + work_completed = work + + if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time) + break + end + end + + logger.info "stopping work thread #{index}" + end + end + end + + # Actually perform the work for this tick. This will call each job which has been registered. + # + # @return [Boolean] Whether any work was completed in this job or not + def work + completed_work = 0 + ActiveRecord::Base.connection_pool.with_connection do + JOBS.each do |job_class| + capture_errors do + job = job_class.new(logger: logger) + job.call + + completed_work += 1 if job.work_completed? + end + end + end + completed_work.positive? + end + + # Start the tasks thread + # + # @return [void] + def start_tasks_thread + logger.info "starting tasks thread" + @threads << Thread.new do + logger.tagged(component: "worker", thread: "tasks") do + loop do + run_tasks + + if shutdown_after_wait?(@task_sleep_time) + break + end + end + + logger.info "stopping tasks thread" + ActiveRecord::Base.connection_pool.with_connection do + if WorkerRole.release(:tasks) + logger.info "releasesd tasks role" + end + end + end + end + end + + # Run the tasks. This will attempt to acquire the tasks role and if successful it will all the registered + # tasks if they are due to be run. + # + # @return [void] + def run_tasks + role_acquisition_status = ActiveRecord::Base.connection_pool.with_connection do + WorkerRole.acquire(:tasks) + end + + case role_acquisition_status + when :stolen + logger.info "acquired task role by stealing it from a lazy worker" + when :created + logger.info "acquired task role by creating it" + when :renewed + logger.debug "acquired task role by renewing it" + else + logger.debug "could not acquire task role, not doing anything" + return false + end + + ActiveRecord::Base.connection_pool.with_connection do + TASKS.each { |task| run_task(task) } + end + end + + # Run a single task + # + # @param [Class] task The task to run + # @return [void] + def run_task(task) + logger.tagged task: task do + scheduled_task = ScheduledTask.find_by(name: task.to_s) + if scheduled_task.nil? + logger.info "no existing task object, creating it now" + scheduled_task = ScheduledTask.create!(name: task.to_s, next_run_after: task.next_run_after) + end + + next unless scheduled_task.next_run_after < Time.current + + logger.info "running task" + + capture_errors { task.new(logger: logger).call } + + next_run_after = task.next_run_after + logger.info "scheduling task to next run at #{next_run_after}" + scheduled_task.update!(next_run_after: next_run_after) + end + end + + # Return the logger + # + # @return [Klogger::Logger] + def logger + Postal.logger + end + + # Capture exceptions and handle this as appropriate. + # + # @yield The block of code to run + # @return [void] + def capture_errors + yield + rescue StandardError => e + logger.error "#{e.class} (#{e.message})" + e.backtrace.each { |line| logger.error line } + Sentry.capture_exception(e) if defined?(Sentry) + end + + end +end diff --git a/script/send_html_email.rb b/script/send_html_email.rb index 96ead18..ba81aa0 100644 --- a/script/send_html_email.rb +++ b/script/send_html_email.rb @@ -43,21 +43,10 @@ end c = OpenSSL::SSL::SSLContext.new c.verify_mode = OpenSSL::SSL::VERIFY_NONE -<<<<<<< Updated upstream -Net::SMTP.start("127.0.0.1", 2525) do |smtp| - smtp.send_message mail.to_s, mail.from.first, mail.to.first -end -======= -1000.times.map do - Thread.new do - smtp = Net::SMTP.new("77.72.7.155", 25) - # smtp.enable_starttls(c) - smtp.disable_starttls - smtp.start("localhost") - smtp.send_message mail.to_s, mail.from.first, mail.to.first - smtp.finish - end -end.each(&:join) ->>>>>>> Stashed changes +smtp = Net::SMTP.new("127.0.0.1", 2525) +smtp.enable_starttls(c) +smtp.start("localhost") +smtp.send_message mail.to_s, mail.from.first, mail.to.first +smtp.finish puts "Sent" diff --git a/script/smtp_server.rb b/script/smtp_server.rb new file mode 100644 index 0000000..f9ea52f --- /dev/null +++ b/script/smtp_server.rb @@ -0,0 +1,4 @@ +# frozen_string_literal: true + +require_relative "../config/environment" +Postal::SMTPServer::Server.new(debug: true).run diff --git a/script/worker.rb b/script/worker.rb index 121f5ce..42b7781 100755 --- a/script/worker.rb +++ b/script/worker.rb @@ -2,4 +2,4 @@ # frozen_string_literal: true require_relative "../config/environment" -Postal::Worker.new([:main]).work +Worker::Process.new.run diff --git a/spec/app/models/worker_role_spec.rb b/spec/app/models/worker_role_spec.rb new file mode 100644 index 0000000..4776960 --- /dev/null +++ b/spec/app/models/worker_role_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe WorkerRole do + let(:locker_name) { "test" } + + before do + allow(Postal).to receive(:locker_name).and_return(locker_name) + end + + describe ".acquire" do + context "when there are no existing roles" do + it "returns :created" do + expect(WorkerRole.acquire("test")).to eq(:created) + end + end + + context "when the current process holds a lock for a role" do + it "returns :renewed" do + create(:worker_role, role: "test", worker: "test", acquired_at: 1.minute.ago) + expect(WorkerRole.acquire("test")).to eq(:renewed) + end + end + + context "when the role has become stale" do + it "returns :stolen" do + create(:worker_role, role: "test", worker: "another", acquired_at: 10.minute.ago) + expect(WorkerRole.acquire("test")).to eq(:stolen) + end + end + + context "when the role is already locked by another worker" do + it "returns false" do + create(:worker_role, role: "test", worker: "another", acquired_at: 1.minute.ago) + expect(WorkerRole.acquire("test")).to eq(false) + end + end + end + + describe ".release" do + context "when the role is locked by the current worker" do + it "deletes the role and returns true" do + role = create(:worker_role, role: "test", worker: "test") + expect(WorkerRole.release("test")).to eq(true) + expect(WorkerRole.find_by(id: role.id)).to be_nil + end + end + + context "when the role is locked by another worker" do + it "does not delete the role and returns false" do + role = create(:worker_role, role: "test", worker: "another") + expect(WorkerRole.release("test")).to eq(false) + expect(WorkerRole.find_by(id: role.id)).to be_present + end + end + end +end diff --git a/spec/factories/ip_address_factory.rb b/spec/factories/ip_address_factory.rb new file mode 100644 index 0000000..b1a479e --- /dev/null +++ b/spec/factories/ip_address_factory.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: ip_addresses +# +# id :integer not null, primary key +# hostname :string(255) +# ipv4 :string(255) +# ipv6 :string(255) +# priority :integer +# created_at :datetime +# updated_at :datetime +# ip_pool_id :integer +# +FactoryBot.define do + factory :ip_address do + ip_pool + ipv4 { "10.0.0.1" } + ipv6 { "2001:0db8:85a3:0000:0000:8a2e:0370:7334" } + hostname { "ip.example.com" } + end +end diff --git a/spec/factories/ip_pool_factory.rb b/spec/factories/ip_pool_factory.rb new file mode 100644 index 0000000..2870204 --- /dev/null +++ b/spec/factories/ip_pool_factory.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: ip_pools +# +# id :integer not null, primary key +# default :boolean default(FALSE) +# name :string(255) +# uuid :string(255) +# created_at :datetime +# updated_at :datetime +# +# Indexes +# +# index_ip_pools_on_uuid (uuid) +# +FactoryBot.define do + factory :ip_pool do + name { "Default Pool" } + default { true } + end +end diff --git a/spec/factories/queued_message_factory.rb b/spec/factories/queued_message_factory.rb new file mode 100644 index 0000000..b15d456 --- /dev/null +++ b/spec/factories/queued_message_factory.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: queued_messages +# +# id :integer not null, primary key +# attempts :integer default(0) +# batch_key :string(255) +# domain :string(255) +# locked_at :datetime +# locked_by :string(255) +# manual :boolean default(FALSE) +# retry_after :datetime +# created_at :datetime +# updated_at :datetime +# ip_address_id :integer +# message_id :integer +# route_id :integer +# server_id :integer +# +# Indexes +# +# index_queued_messages_on_domain (domain) +# index_queued_messages_on_message_id (message_id) +# index_queued_messages_on_server_id (server_id) +# +FactoryBot.define do + factory :queued_message do + server + message_id { 1234 } + domain { "example.com" } + batch_key { nil } + + trait :locked do + locked_by { "worker1" } + locked_at { 5.minutes.ago } + end + end +end diff --git a/spec/factories/webhook_factory.rb b/spec/factories/webhook_factory.rb new file mode 100644 index 0000000..8c5f594 --- /dev/null +++ b/spec/factories/webhook_factory.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :webhook do + server + name { "Example Webhook" } + url { "https://example.com" } + all_events { true } + end +end diff --git a/spec/factories/webhook_request_factory.rb b/spec/factories/webhook_request_factory.rb new file mode 100644 index 0000000..7a51f1b --- /dev/null +++ b/spec/factories/webhook_request_factory.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: webhook_requests +# +# id :integer not null, primary key +# attempts :integer default(0) +# error :text(65535) +# event :string(255) +# locked_at :datetime +# locked_by :string(255) +# payload :text(65535) +# retry_after :datetime +# url :string(255) +# uuid :string(255) +# created_at :datetime +# server_id :integer +# webhook_id :integer +# +# Indexes +# +# index_webhook_requests_on_locked_by (locked_by) +# +FactoryBot.define do + factory :webhook_request do + webhook + url { "https://example.com" } + event { "ExampleEvent" } + payload { { "hello" => "world" } } + + before(:create) do |webhook_request| + webhook_request.server = webhook_request.webhook&.server + end + + trait :locked do + locked_by { "test" } + locked_at { 5.minutes.ago } + end + end +end diff --git a/spec/factories/worker_role_factory.rb b/spec/factories/worker_role_factory.rb new file mode 100644 index 0000000..be5f497 --- /dev/null +++ b/spec/factories/worker_role_factory.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :worker_role do + role { "test" } + end +end diff --git a/spec/lib/worker/jobs/process_queued_messages_job.rb b/spec/lib/worker/jobs/process_queued_messages_job.rb new file mode 100644 index 0000000..f2ce50a --- /dev/null +++ b/spec/lib/worker/jobs/process_queued_messages_job.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +require "rails_helper" + +module Worker + module Jobs + + RSpec.describe ProcessQueuedMessagesJob do + subject(:job) { described_class.new(logger: Postal.logger) } + let(:mocked_service) { instance_double(UnqueueMessageService) } + + before do + allow(UnqueueMessageService).to receive(:new).and_return(mocked_service) + allow(mocked_service).to receive(:call).with(any_args) + end + + describe "#call" do + context "when there are no queued messages" do + it "does nothing" do + job.call + expect(UnqueueMessageService).to_not have_received(:new) + end + end + + context "when there is an unlocked queued message for an IP address that is not ours" do + it "does nothing" do + ip_address = create(:ip_address) + queued_message = create(:queued_message, ip_address: ip_address) + job.call + expect(UnqueueMessageService).to_not have_received(:new) + expect(queued_message.reload.locked?).to be false + end + end + + context "when there is an unlocked queued message without an IP address without a retry time" do + it "locks the message and calls the service" do + queued_message = create(:queued_message, ip_address: nil, retry_after: nil) + job.call + expect(UnqueueMessageService).to have_received(:new).with(logger: kind_of(Klogger::Logger), queued_message: queued_message) + expect(mocked_service).to have_received(:call) + expect(queued_message.reload.locked?).to be true + expect(queued_message.locked_by).to eq Postal.locker_name + expect(queued_message.locked_at).to be_within(1.second).of(Time.current) + end + end + + context "when there is an unlocked queued message without an IP address without a retry time in the past" do + it "locks the message and calls the service" do + queued_message = create(:queued_message, ip_address: nil, retry_after: 10.minutes.ago) + job.call + expect(UnqueueMessageService).to have_received(:new).with(logger: kind_of(Klogger::Logger), queued_message: queued_message) + expect(mocked_service).to have_received(:call) + expect(queued_message.reload.locked?).to be true + expect(queued_message.locked_by).to eq Postal.locker_name + expect(queued_message.locked_at).to be_within(1.second).of(Time.current) + end + end + + context "when there is an unlocked queued message without an IP address without a retry time in the future" do + it "does nothing" do + queued_message = create(:queued_message, ip_address: nil, retry_after: 10.minutes.from_now) + job.call + expect(UnqueueMessageService).to_not have_received(:new) + expect(queued_message.reload.locked?).to be false + end + end + + context "when there is a locked queued message without an IP address without a retry time" do + it "does nothing" do + queued_message = create(:queued_message, :locked, ip_address: nil, retry_after: nil) + job.call + expect(UnqueueMessageService).to_not have_received(:new) + expect(queued_message.reload.locked?).to be true + end + end + + context "when there is a locked queued message without an IP address with a retry time in the past" do + it "does nothing" do + queued_message = create(:queued_message, :locked, ip_address: nil, retry_after: 1.month.ago) + job.call + expect(UnqueueMessageService).to_not have_received(:new) + expect(queued_message.reload.locked?).to be true + end + end + + context "when there is an unlocked queued message with an IP address that is ours without a retry time" do + it "locks the message and calls the service" do + ip_address = create(:ip_address, ipv4: "10.20.30.40") + allow(Socket).to receive(:ip_address_list).and_return([Addrinfo.new(["AF_INET", 1, "localhost.localdomain", "10.20.30.40"])]) + queued_message = create(:queued_message, ip_address: ip_address) + job.call + expect(UnqueueMessageService).to have_received(:new).with(logger: kind_of(Klogger::Logger), queued_message: queued_message) + expect(mocked_service).to have_received(:call) + expect(queued_message.reload.locked?).to be true + expect(queued_message.locked_by).to eq Postal.locker_name + expect(queued_message.locked_at).to be_within(1.second).of(Time.current) + end + end + + context "when there is an unlocked queued message with an IP address that is ours without a retry time in the future" do + it "does nothing" do + ip_address = create(:ip_address, ipv4: "10.20.30.40") + allow(Socket).to receive(:ip_address_list).and_return([Addrinfo.new(["AF_INET", 1, "localhost.localdomain", "10.20.30.40"])]) + queued_message = create(:queued_message, ip_address: ip_address, retry_after: 1.month.from_now) + job.call + expect(UnqueueMessageService).to_not have_received(:new) + expect(queued_message.reload.locked?).to be false + end + end + end + end + + end +end diff --git a/spec/lib/worker/jobs/process_webhook_requests_job.rb b/spec/lib/worker/jobs/process_webhook_requests_job.rb new file mode 100644 index 0000000..7624060 --- /dev/null +++ b/spec/lib/worker/jobs/process_webhook_requests_job.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "rails_helper" + +module Worker + module Jobs + + RSpec.describe ProcessWebhookRequestsJob do + subject(:job) { described_class.new(logger: Postal.logger) } + + before do + allow_any_instance_of(WebhookRequest).to receive(:deliver) + end + + context "when there are no requests to process" do + it "does nothing" do + job.call + expect(job.work_completed?).to be false + end + end + + context "when there is a unlocked request with no retry time" do + it "delivers the request" do + create(:webhook_request) + job.call + expect(job.work_completed?).to be true + end + end + + context "when there is an unlocked request with a retry time in the past" do + it "delivers the request" do + create(:webhook_request, retry_after: 1.minute.ago) + job.call + expect(job.work_completed?).to be true + end + end + + context "when there is an unlocked request with a retry time in the future" do + it "does nothing" do + create(:webhook_request, retry_after: 1.minute.from_now) + job.call + expect(job.work_completed?).to be false + end + end + + context "when there is a locked requested without a retry time" do + it "does nothing" do + create(:webhook_request, :locked) + job.call + expect(job.work_completed?).to be false + end + end + end + + end +end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index ec9f85c..67dc502 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -8,6 +8,7 @@ require "spec_helper" require "factory_bot" require "timecop" require "database_cleaner" +require "webmock/rspec" DatabaseCleaner.allow_remote_database_url = true ActiveRecord::Base.logger = Logger.new("/dev/null")