@@ -1,6 +1,7 @@
class UnqueueMessageJob < Postal :: Job
def perform
if original_message = QueuedMessage . find_by_id ( params [ 'id' ] )
if original_message = QueuedMessage . find_by_id ( params [ " id " ] )
if original_message . acquire_lock
log " Lock acquired for queued message #{ original_message . id } "
@@ -22,7 +23,7 @@ class UnqueueMessageJob < Postal::Job
begin
other_messages = original_message . batchable_messages ( 100 )
log " Found #{ other_messages . size } associated messages to process at the same time (batch key: #{ original_message . batch_key } ) "
rescue
rescue StandardError
original_message . unlock
raise
end
@@ -45,7 +46,7 @@ class UnqueueMessageJob < Postal::Job
#
if queued_message . server . suspended?
log " #{ log_prefix } Server is suspended. Holding message. "
queued_message . message . create_delivery ( ' Held' , : details = > " Mail server has been suspended. No e-mails can be processed at present. Contact support for assistance. " )
queued_message . message . create_delivery ( " Held" , details : " Mail server has been suspended. No e-mails can be processed at present. Contact support for assistance. " )
queued_message . destroy
next
end
@@ -53,19 +54,19 @@ class UnqueueMessageJob < Postal::Job
# We might not be able to send this any more, check the attempts
if queued_message . attempts > = Postal . config . general . maximum_delivery_attempts
details = " Maximum number of delivery attempts ( #{ queued_message . attempts } ) has been reached. "
if queued_message . message . scope == ' incoming'
if queued_message . message . scope == " incoming"
# Send bounces to incoming e-mails when they are hard failed
if bounce_id = queued_message . send_bounce
details += " Bounce sent to sender (see message <msg: #{ bounce_id } >) "
end
elsif queued_message . message . scope == ' outgoing'
elsif queued_message . message . scope == " outgoing"
# Add the recipient to the suppression list
if queued_message . server . message_db . suppression_list . add ( :recipient , queued_message . message . rcpt_to , : reason = > " too many soft fails " )
if queued_message . server . message_db . suppression_list . add ( :recipient , queued_message . message . rcpt_to , reason : " too many soft fails " )
log " Added #{ queued_message . message . rcpt_to } to suppression list because maximum attempts has been reached "
details += " Added #{ queued_message . message . rcpt_to } to suppression list because delivery has failed #{ queued_message . attempts } times. "
end
end
queued_message . message . create_delivery ( ' HardFail' , : details = > details )
queued_message . message . create_delivery ( " HardFail" , details : details )
queued_message . destroy
log " #{ log_prefix } Message has reached maximum number of attempts. Hard failing. "
next
@@ -74,15 +75,15 @@ class UnqueueMessageJob < Postal::Job
# If the raw message has been removed (removed by retention)
unless queued_message . message . raw_message?
log " #{ log_prefix } Raw message has been removed. Not sending. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " Raw message has been removed. Cannot send message. " )
queued_message . message . create_delivery ( " HardFail" , details : " Raw message has been removed. Cannot send message. " )
queued_message . destroy
next
end
#
# Handle Incoming Messages
# Handle Incoming Messages
#
if queued_message . message . scope == ' incoming'
if queued_message . message . scope == " incoming"
#
# If this is a bounce, we need to handle it as such
#
@@ -91,8 +92,8 @@ class UnqueueMessageJob < Postal::Job
original_messages = queued_message . message . original_messages
unless original_messages . empty?
for original_message in queued_message . message . original_messages
queued_message . message . update ( : bounce_for_id = > original_message . id , : domain_id = > original_message . domain_id )
queued_message . message . create_delivery ( ' Processed' , : details = > " This has been detected as a bounce message for <msg: #{ original_message . id } >. " )
queued_message . message . update ( bounce_for_id : original_message . id , domain_id : original_message . domain_id )
queued_message . message . create_delivery ( " Processed" , details : " This has been detected as a bounce message for <msg: #{ original_message . id } >. " )
original_message . bounce! ( queued_message . message )
log " #{ log_prefix } Bounce linked with message #{ original_message . id } "
end
@@ -101,11 +102,11 @@ class UnqueueMessageJob < Postal::Job
end
# This message was sent to the return path but hasn't been matched
# to an original message. If we have a route for this, route it
# otherwise we'll drop at this point.
# to an original message. If we have a route for this, route it
# otherwise we'll drop at this point.
if queued_message . message . route_id . nil?
log " #{ log_prefix } No source messages found. Hard failing. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " This message was a bounce but we couldn't link it with any outgoing message and there was no route for it. " )
queued_message . message . create_delivery ( " HardFail" , details : " This message was a bounce but we couldn't link it with any outgoing message and there was no route for it. " )
queued_message . destroy
next
end
@@ -124,7 +125,7 @@ class UnqueueMessageJob < Postal::Job
queued_message . message . inspect_message
if queued_message . message . inspected == 1
is_spam = queued_message . message . spam_score > queued_message . server . spam_threshold
queued_message . message . update ( : spam = > 1 ) if is_spam
queued_message . message . update ( spam : 1 ) if is_spam
queued_message . message . append_headers (
" X-Postal-Spam: #{ queued_message . message . spam == 1 ? 'yes' : 'no' } " ,
" X-Postal-Spam-Threshold: #{ queued_message . server . spam_threshold } " ,
@@ -140,15 +141,15 @@ class UnqueueMessageJob < Postal::Job
#
if queued_message . message . spam_score > = queued_message . server . spam_failure_threshold
log " #{ log_prefix } Message has a spam score higher than the server's maxmimum. Hard failing. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message's spam score is higher than the failure threshold for this server. Threshold is currently #{ queued_message . server . spam_failure_threshold } . " )
queued_message . message . create_delivery ( " HardFail" , details : " Message's spam score is higher than the failure threshold for this server. Threshold is currently #{ queued_message . server . spam_failure_threshold } . " )
queued_message . destroy
next
end
# If the server is in development mode, hold it
if queued_message . server . mode == ' Development' && ! queued_message . manual?
if queued_message . server . mode == " Development" && ! queued_message . manual?
log " Server is in development mode so holding. "
queued_message . message . create_delivery ( ' Held' , : details = > " Server is in development mode. " )
queued_message . message . create_delivery ( " Held" , details : " Server is in development mode. " )
queued_message . destroy
log " #{ log_prefix } Server is in development mode. Holding. "
next
@@ -161,16 +162,16 @@ class UnqueueMessageJob < Postal::Job
if route = queued_message . message . route
# If the route says we're holding quananteed mail and this is spam, we'll hold this
if route . spam_mode == ' Quarantine' && queued_message . message . spam == 1 && ! queued_message . manual?
queued_message . message . create_delivery ( ' Held' , : details = > " Message placed into quarantine. " )
if route . spam_mode == " Quarantine" && queued_message . message . spam == 1 && ! queued_message . manual?
queued_message . message . create_delivery ( " Held" , details : " Message placed into quarantine. " )
queued_message . destroy
log " #{ log_prefix } Route says to quarantine spam message. Holding. "
next
end
# If the route says we're holding quananteed mail and this is spam, we'll hold this
if route . spam_mode == ' Fail' && queued_message . message . spam == 1 && ! queued_message . manual?
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message is spam and the route specifies it should be failed. " )
if route . spam_mode == " Fail" && queued_message . message . spam == 1 && ! queued_message . manual?
queued_message . message . create_delivery ( " HardFail" , details : " Message is spam and the route specifies it should be failed. " )
queued_message . destroy
log " #{ log_prefix } Route says to fail spam message. Hard failing. "
next
@@ -179,8 +180,8 @@ class UnqueueMessageJob < Postal::Job
#
# Messages that should be blindly accepted are blindly accepted
#
if route . mode == ' Accept'
queued_message . message . create_delivery ( ' Processed' , : details = > " Message has been accepted but not sent to any endpoints. " )
if route . mode == " Accept"
queued_message . message . create_delivery ( " Processed" , details : " Message has been accepted but not sent to any endpoints. " )
queued_message . destroy
log " #{ log_prefix } Route says to accept without endpoint. Marking as processed. "
next
@@ -189,14 +190,14 @@ class UnqueueMessageJob < Postal::Job
#
# Messages that should be accepted and held should be held
#
if route . mode == ' Hold'
if route . mode == " Hold"
log " #{ log_prefix } Route says to hold message. "
if queued_message . manual?
log " #{ log_prefix } Message was queued manually. Marking as processed. "
queued_message . message . create_delivery ( ' Processed' , : details = > " Message has been processed. " )
queued_message . message . create_delivery ( " Processed" , details : " Message has been processed. " )
else
log " #{ log_prefix } Message was not queued manually. Holding. "
queued_message . message . create_delivery ( ' Held' , : details = > " Message has been accepted but not sent to any endpoints. " )
queued_message . message . create_delivery ( " Held" , details : " Message has been accepted but not sent to any endpoints. " )
end
queued_message . destroy
next
@@ -205,45 +206,43 @@ class UnqueueMessageJob < Postal::Job
#
# Messages that should be bounced should be bounced (or rejected if they got this far)
#
if route . mode == ' Bounce' || route . mode == ' Reject'
if route . mode == " Bounce" || route . mode == " Reject"
if id = queued_message . send_bounce
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message has been bounced because the route asks for this. See message <msg: #{ id } > " )
queued_message . message . create_delivery ( " HardFail" , details : " Message has been bounced because the route asks for this. See message <msg: #{ id } > " )
log " #{ log_prefix } Route says to bounce. Hard failing and sent bounce ( #{ id } ). "
end
queued_message . destroy
next
end
begin
if @fixed_result
result = @fixed_result
if @fixed_result
result = @fixed_result
else
case queued_message . message . endpoint
when SMTPEndpoint
sender = cached_sender ( Postal :: SMTPSender , queued_message . message . recipient_domain , nil , servers : [ queued_message . message . endpoint ] )
when HTTPEndpoint
sender = cached_sender ( Postal :: HTTPSender , queued_message . message . endpoint )
when AddressEndpoint
sender = cached_sender ( Postal :: SMTPSender , queued_message . message . endpoint . domain , nil , force_rcpt_to : queued_message . message . endpoint . address )
else
case queued_message . message . endpoint
when SMTPEndpoint
sender = cached_sender ( Postal :: SMTPSender , queued_message . m essage . recipient_domain , nil , :servers = > [ queued_message . message . endpoint ] )
when HTTPEndpoin t
sender = cached_sender ( Postal :: HTTPSender , queued_message . message . endpoint )
when AddressEndpoint
sender = cached_sender ( Postal :: SMTPSender , queued_message . message . endpoint . domain , nil , :force_rcpt_to = > queued_message . message . endpoint . address )
else
log " #{ log_prefix } Invalid endpoint for route ( #{ queued_message . message . endpoint_type } ) "
queued_message . message . create_delivery ( 'HardFail' , :details = > " Invalid endpoint for route. " )
queued_message . destroy
next
end
result = sender . send_message ( queued_message . message )
if result . connect_error
@fixed_result = result
end
log " #{ log_prefix } Invalid endpoint for route ( #{ queued_message . message . endpoint_type } ) "
queued_message . message . create_delivery ( " HardFail " , details : " Invalid endpoint for route. " )
queued_message . d estroy
nex t
end
result = sender . send_message ( queued_message . message )
if result . connect_error
@fixed_result = result
end
end
# Log the result
log_details = result . details
if result . type == ' HardFail' && result . suppress_bounce
if result . type == " HardFail" && result . suppress_bounce
# The delivery hard failed, but requested that no bounce be sent
log " #{ log_prefix } Suppressing bounce message after hard fail "
elsif result . type == ' HardFail' && queued_message . message . send_bounces?
elsif result . type == " HardFail" && queued_message . message . send_bounces?
# If the message is a hard fail, send a bounce message for this message.
log " #{ log_prefix } Sending a bounce because message hard failed "
if bounce_id = queued_message . send_bounce
@@ -252,7 +251,7 @@ class UnqueueMessageJob < Postal::Job
end
end
queued_message . message . create_delivery ( result . type , : details = > log_details , : output = > result . output & . strip , : sent_with_ssl = > result . secure , : log_id = > result . log_id , : time = > result . time )
queued_message . message . create_delivery ( result . type , details : log_details , output : result . output & . strip , sent_with_ssl : result . secure , log_id : result . log_id , time : result . time )
if result . retry
log " #{ log_prefix } Message requeued for trying later. "
@@ -266,7 +265,7 @@ class UnqueueMessageJob < Postal::Job
end
else
log " #{ log_prefix } No route and/or endpoint available for processing. Hard failing. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message does not have a route and/or endpoint available for delivery. " )
queued_message . message . create_delivery ( " HardFail" , details : " Message does not have a route and/or endpoint available for delivery. " )
queued_message . destroy
next
end
@@ -275,10 +274,10 @@ class UnqueueMessageJob < Postal::Job
#
# Handle Outgoing Messages
#
if queued_message . message . scope == ' outgoing'
if queued_message . message . scope == " outgoing"
if queued_message . message . domain . nil?
log " #{ log_prefix } Message has no domain. Hard failing. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message's domain no longer exist " )
queued_message . message . create_delivery ( " HardFail" , details : " Message's domain no longer exist " )
queued_message . destroy
next
end
@@ -288,7 +287,7 @@ class UnqueueMessageJob < Postal::Job
#
if queued_message . message . rcpt_to . blank?
log " #{ log_prefix } Message has no to address. Hard failing. "
queued_message . message . create_delivery ( ' HardFail' , : details = > " Message doesn't have an RCPT to " )
queued_message . message . create_delivery ( " HardFail" , details : " Message doesn't have an RCPT to " )
queued_message . destroy
next
end
@@ -298,7 +297,7 @@ class UnqueueMessageJob < Postal::Job
#
if ! queued_message . manual? && queued_message . message . credential && queued_message . message . credential . hold?
log " #{ log_prefix } Credential wants us to hold messages. Holding. "
queued_message . message . create_delivery ( ' Held' , : details = > " Credential is configured to hold all messages authenticated by it. " )
queued_message . message . create_delivery ( " Held" , details : " Credential is configured to hold all messages authenticated by it. " )
queued_message . destroy
next
end
@@ -308,15 +307,15 @@ class UnqueueMessageJob < Postal::Job
#
if ! queued_message . manual? && sl = queued_message . server . message_db . suppression_list . get ( :recipient , queued_message . message . rcpt_to )
log " #{ log_prefix } Recipient is on the suppression list. Holding. "
queued_message . message . create_delivery ( ' Held' , : details = > " Recipient ( #{ queued_message . message . rcpt_to } ) is on the suppression list (reason: #{ sl [ 'reason' ] } ) " )
queued_message . message . create_delivery ( " Held" , details : " Recipient ( #{ queued_message . message . rcpt_to } ) is on the suppression list (reason: #{ sl [ 'reason' ] } ) " )
queued_message . destroy
next
end
# Extract a tag and add it to the message if one doesn't exist
if queued_message . message . tag . nil? && tag = queued_message . message . headers [ ' x-postal-tag' ]
if queued_message . message . tag . nil? && tag = queued_message . message . headers [ " x-postal-tag" ]
log " #{ log_prefix } Added tag #{ tag . last } "
queued_message . message . update ( : tag = > tag . last )
queued_message . message . update ( tag : tag . last )
end
# Parse the content of the message as appropriate
@@ -331,61 +330,60 @@ class UnqueueMessageJob < Postal::Job
queued_message . message . inspect_message
if queued_message . message . inspected == 1
if queued_message . message . spam_score > = queued_message . server . outbound_spam_threshold
queued_message . message . update ( : spam = > 1 )
queued_message . message . update ( spam : 1 )
end
log " #{ log_prefix } Message inspected successfully "
end
end
if queued_message . message . spam == 1
queued_message . message . create_delivery ( " HardFail " , : details = > " Message is likely spam. Threshold is #{ queued_message . server . outbound_spam_threshold } and the message scored #{ queued_message . message . spam_score } . " )
queued_message . message . create_delivery ( " HardFail " , details : " Message is likely spam. Threshold is #{ queued_message . server . outbound_spam_threshold } and the message scored #{ queued_message . message . spam_score } . " )
queued_message . destroy
log " #{ log_prefix } Message is spam ( #{ queued_message . message . spam_score } ). Hard failing. "
next
end
# Add outgoing headers
if ! queued_message . message . has_outgoing_headers?
unless queued_message . message . has_outgoing_headers?
queued_message . message . add_outgoing_headers
end
# Check send limits
if queued_message . server . send_limit_exceeded?
# If we're over the limit, we're going to be holding this message
queued_message . server . update_columns ( : send_limit_exceeded_at = > Time . now , : send_limit_approaching_at = > nil )
queued_message . message . create_delivery ( ' Held' , : details = > " Message held because send limit ( #{ queued_message . server . send_limit } ) has been reached. " )
queued_message . server . update_columns ( send_limit_exceeded_at : Time . now , send_limit_approaching_at : nil )
queued_message . message . create_delivery ( " Held" , details : " Message held because send limit ( #{ queued_message . server . send_limit } ) has been reached. " )
queued_message . destroy
log " #{ log_prefix } Server send limit has been exceeded. Holding. "
next
elsif queued_message . server . send_limit_approaching?
# If we're approaching the limit, just say we are but continue to process the message
queued_message . server . update_columns ( : send_limit_approaching_at = > Time . now , : send_limit_exceeded_at = > nil )
queued_message . server . update_columns ( send_limit_approaching_at : Time . now , send_limit_exceeded_at : nil )
else
queued_message . server . update_columns ( : send_limit_approaching_at = > nil , : send_limit_exceeded_at = > nil )
queued_message . server . update_columns ( send_limit_approaching_at : nil , send_limit_exceeded_at : nil )
end
# Update the live stats for this message.
queued_message . message . database . live_stats . increment ( queued_message . message . scope )
# If the server is in development mode, hold it
if queued_message . server . mode == ' Development' && ! queued_message . manual?
if queued_message . server . mode == " Development" && ! queued_message . manual?
log " Server is in development mode so holding. "
queued_message . message . create_delivery ( ' Held' , : details = > " Server is in development mode. " )
queued_message . message . create_delivery ( " Held" , details : " Server is in development mode. " )
queued_message . destroy
log " #{ log_prefix } Server is in development mode. Holding. "
next
end
# Send the outgoing message to the SMTP sender
begin
if @fixed_result
result = @fixed_result
else
sender = cached_sender ( Postal :: SMTPSender , queued_message . message . recipient_domain , queued_message . ip_address )
result = sender . send_message ( queued_message . message )
if result . connect_error
@fixed_result = result
end
if @fixed_result
result = @fixed_result
else
sender = cached_sender ( Postal :: SMTPSender , queued_message . message . recipient_domain , queued_message . ip_address )
result = sender . send_message ( queued_message . message )
if result . connect_error
@fixed_result = result
end
end
@@ -393,30 +391,26 @@ class UnqueueMessageJob < Postal::Job
# If the message has been hard failed, check to see how many other recent hard fails we've had for the address
# and if there are more than 2, suppress the address for 30 days.
#
if result . type == ' HardFail'
recent_hard_fails = queued_message . server . message_db . select ( :messages , : where = > { : rcpt_to = > queued_message . message . rcpt_to , : status = > ' HardFail' , : timestamp = > { : greater_than = > 24 . hours . ago . to_f } } , : count = > true )
if recent_hard_fails > = 1
if queued_message . server . message_db . suppression_list . add ( :recipient , queued_message . message . rcpt_to , :reason = > " too many hard fail s" )
log " #{ log_prefix } Added #{ queued_message . message . rcpt_to } to suppression list because #{ recent_hard_fails } hard fails in 24 hours "
result . details += " . " if result . details =~ / \ . \ z /
result . details += " Recipient added to suppression list (too many hard fails). "
end
if result . type == " HardFail"
recent_hard_fails = queued_message . server . message_db . select ( :messages , where : { rcpt_to : queued_message . message . rcpt_to , status : " HardFail" , timestamp : { greater_than : 24 . hours . ago . to_f } } , count : true )
if recent_hard_fails > = 1 && queued_message . server . message_db . suppression_list . add ( :recipient , queued_message . message . rcpt_to , reason : " too many hard fails " )
log " #{ log_prefix } Added #{ queued_message . message . rcpt_to } to suppression list because #{ recent_hard_fails } hard fails in 24 hour s"
result . details += " . " if result . details =~ / \ . \ z /
result . details += " Recipient added to suppression list (too many hard fails). "
end
end
#
# If a message is sent successfully, remove the users from the suppression list
#
if result . type == 'Sent'
if queued_message . server . message_db . suppression_list . remove ( :recipient , queued_message . message . rcpt_to )
log " #{ log_prefix } Removed #{ queued_message . message . rcpt_to } from suppression list because success "
result . details += " . " if result . details =~ / \ . \ z /
result . details += " Recipient removed from suppression list. "
end
if result . type == " Sent " && queued_message . server . message_db . suppression_list . remove ( :recipient , queued_message . message . rcpt_to )
log " #{ log_prefix } Removed #{ queued_message . message . rcpt_to } from suppression list because success "
result . details += " . " if result . details =~ / \ . \ z /
result . details += " Recipient removed from suppression list. "
end
# Log the result
queued_message . message . create_delivery ( result . type , : details = > result . details , : output = > result . output , : sent_with_ssl = > result . secure , : log_id = > result . log_id , : time = > result . time )
queued_message . message . create_delivery ( result . type , details : result . details , output : result . output , sent_with_ssl : result . secure , log_id : result . log_id , time : result . time )
if result . retry
log " #{ log_prefix } Message requeued for trying later. "
queued_message . retry_later ( result . retry . is_a? ( Integer ) ? result . retry : nil )
@@ -425,17 +419,16 @@ class UnqueueMessageJob < Postal::Job
queued_message . destroy
end
end
rescue = > e
rescue StandardError = > e
log " #{ log_prefix } Internal error: #{ e . class } : #{ e . message } "
e . backtrace . each { | e | log ( " #{ log_prefix } #{ e } " ) }
queued_message . retry_later
log " #{ log_prefix } Queued message was unlocked "
if defined? ( Raven )
Raven . capture_exception ( e , : extra = > { : job_id = > self . id , : server_id = > queued_message . server_id , : message_id = > queued_message . message_id } )
Raven . capture_exception ( e , extra : { job_id : self . id , server_id : queued_message . server_id , message_id : queued_message . message_id } )
end
if queued_message . message
queued_message . message . create_delivery ( " Error " , : details = > " An internal error occurred while sending this message. This message will be retried automatically. If this persists, contact support for assistance. " , : output = > " #{ e . class } : #{ e . message } " , : log_id = > " J- #{ self . id } " )
queued_message . message . create_delivery ( " Error " , details : " An internal error occurred while sending this message. This message will be retried automatically. If this persists, contact support for assistance. " , output : " #{ e . class } : #{ e . message } " , log_id : " J- #{ self . id } " )
end
end
end
@@ -447,7 +440,11 @@ class UnqueueMessageJob < Postal::Job
log " No queued message with ID #{ params [ 'id' ] } was available for processing. "
end
ensure
@sender & . finish rescue nil
begin
@sender & . finish
rescue StandardError
nil
end
end
private
@@ -459,4 +456,5 @@ class UnqueueMessageJob < Postal::Job
sender
end
end
end