# frozen_string_literal: true module MessageDequeuer class InitialProcessor < Base attr_accessor :send_result def process logger.tagged(original_queued_message: @queued_message.id) do logger.info "starting message unqueue" begin catch_stops do check_message_exists check_message_is_ready find_other_messages_for_batch # Process the original message and then all of those # found for batching. process_message(@queued_message) @other_messages.each { |message| process_message(message) } end ensure @state.finished end logger.info "finished message unqueue" end end private def check_message_exists @queued_message.message rescue Postal::MessageDB::Message::NotFound log "unqueue because backend message has been removed." remove_from_queue stop_processing end def check_message_is_ready return if @queued_message.ready? log "skipping because message isn't ready for processing" @queued_message.unlock stop_processing end def find_other_messages_for_batch @other_messages = @queued_message.batchable_messages(100) log "found #{@other_messages.size} associated messages to process at the same time", batch_key: @queued_message.batch_key rescue StandardError @queued_message.unlock raise end def process_message(queued_message) logger.tagged(queued_message: queued_message.id) do SingleMessageProcessor.process(queued_message, logger: @logger, state: @state) end end end end