diff --git a/Gemfile b/Gemfile index c06d59b..8f35f2f 100644 --- a/Gemfile +++ b/Gemfile @@ -25,7 +25,7 @@ gem 'chronic' gem 'basic_ssl' gem 'clockwork' gem 'encrypto_signo' -gem 'epoll', :require => nil +gem 'nio4r' gem 'mongo' gem 'sentry-raven' gem 'gelf' diff --git a/Gemfile.lock b/Gemfile.lock index 00f59ea..b5faef1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -92,7 +92,6 @@ GEM deep_merge (1.1.1) dynamic_form (1.1.4) encrypto_signo (1.0.0) - epoll (0.3.0) erubis (2.7.0) execjs (2.7.0) faraday (0.9.2) @@ -226,7 +225,6 @@ DEPENDENCIES createsend (~> 4.0) dynamic_form encrypto_signo - epoll foreman gelf haml @@ -241,6 +239,7 @@ DEPENDENCIES mysql2 (>= 0.3.18, < 0.5) nifty-utils nilify_blanks + nio4r puma (~> 3.0) rails (= 5.0.2) sass-rails (~> 5.0) @@ -251,4 +250,4 @@ DEPENDENCIES web-console BUNDLED WITH - 1.14.5 + 1.14.6 diff --git a/lib/postal/smtp_server/server.rb b/lib/postal/smtp_server/server.rb index c6a6d69..7d14426 100644 --- a/lib/postal/smtp_server/server.rb +++ b/lib/postal/smtp_server/server.rb @@ -1,5 +1,5 @@ require 'ipaddr' -require 'epoll' if RUBY_PLATFORM.include?('linux') +require 'nio' module Postal module SMTPServer @@ -62,59 +62,58 @@ module Postal @server.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 5) end ENV['SERVER_FD'] = @server.to_i.to_s + logger.info "Listening" end def unlisten - if @epoll - @epoll.del(@server) - if @epoll.size == 0 - Process.exit(0) - end - end - @server.close + # Instruct the nio loop to unlisten and wake it + $unlisten = true + @io_selector.wakeup end def kill_parent Process.kill('TERM', Process.ppid) end - def run_linux - if ENV['SERVER_FD'] - listen - kill_parent - else - listen - end - @epoll = Epoll.create - logger.info "Listening" - @epoll.add(@server, Epoll::IN) + def run_event_loop + # Set up an instance of nio4r to monitor for connections and data + @io_selector = NIO::Selector.new + # Register the SMTP listener + @io_selector.register(@server, :r) + # Create a hash to contain a buffer for each client. buffers = Hash.new { |h, k| h[k] = String.new.force_encoding('BINARY') } - clients = {} loop do - evlist = @epoll.wait - evlist.each do |ev| - io = ev.data + # Wait for an event to occur + @io_selector.select do |monitor| + # Get the IO from the nio monitor + io = monitor.io + # Is this event an incoming connection? if io.is_a?(TCPServer) begin + # Accept the connection new_io = io.accept - if Postal.config.smtp_server.proxy_protocol + # If we are using the haproxy proxy protocol, we will be sent the + # client's IP later. Delay the welcome process. client = Client.new(nil) if Postal.config.smtp_server.log_connect logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m" end else + # We're not using the proxy protocol so we already know the client's IP client = Client.new(new_io.remote_address.ip_address) if Postal.config.smtp_server.log_connect logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m" end + # We know who the client is, welcome them. client.log "\e[35m Client identified as #{new_io.remote_address.ip_address}\e[0m" new_io.print("220 #{Postal.config.dns.smtp_server_hostname} ESMTP Postal/#{client.id}") end - - clients[new_io] = client - @epoll.add(new_io, Epoll::IN|Epoll::PRI|Epoll::HUP) + # Register the client and its socket with nio4r + monitor = @io_selector.register(new_io, :r) + monitor.value = client rescue => e + # If something goes wrong, log as appropriate and disconnect the client if defined?(Raven) Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)}) end @@ -126,10 +125,15 @@ module Postal new_io.close rescue nil end else + # This event is not an incoming connection so it must be data from a client begin - client = clients[io] + # Get the client from the nio monitor + client = monitor.value + # For now we assume the connection isn't closed eof = false begin + # Read 10kiB of data at a time from the socket. + # There is an extra step for SSL sockets case io when OpenSSL::SSL::SSLSocket buffers[io] << io.readpartial(10240) @@ -143,13 +147,18 @@ module Postal # Client went away eof = true end + # We line buffer, so look to see if we have received a newline + # and keep doing so until all buffered lines have been processed. while buffers[io].index("\n") + # Extract the line if buffers[io].index("\r\n") line, buffers[io] = buffers[io].split("\r\n", 2) else line, buffers[io] = buffers[io].split("\n", 2) end + # Send the received line to the client object for processing result = client.handle(line) + # If the client object returned some data, write it back to the client unless result.nil? result = [result] unless result.is_a?(Array) result.compact.each do |line| @@ -164,17 +173,23 @@ module Postal end end end + # If the client requested we start TLS, do it now if !eof && client.start_tls? + # Clear the request client.start_tls = false - @epoll.del(io) - clients.delete(io) + # Deregister the unencrypted IO + @io_selector.deregister(io) buffers.delete(io) + # Prepare TLS on the socket tcp_io = io io = OpenSSL::SSL::SSLSocket.new(io, ssl_context) - @epoll.add(io, Epoll::IN) - clients[io] = client + # Register the new TLS socket with nio + monitor = @io_selector.register(io, :r) + monitor.value = client + # Close the underlying IO when the TLS socket is closed io.sync_close = true begin + # Start TLS negotiation io.accept rescue OpenSSL::SSL::SSLError => e client.log "SSL Negotiation Failed: #{e.message}" @@ -182,17 +197,20 @@ module Postal end end + # Has the clint requested we close the connection? if client.finished? || eof client.log "\e[35m Connection closed\e[0m" - @epoll.del(io) - clients.delete(io) + # Deregister the socket and close it + @io_selector.deregister(io) buffers.delete(io) io.close - if @epoll.size == 0 + # If we have no more clients or listeners left, exit the process + if @io_selector.empty? Process.exit(0) end end rescue => e + # Something went wrong, log as appropriate client_id = client ? client.id : '------' if defined?(Raven) Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)}) @@ -203,117 +221,43 @@ module Postal logger.error "[#{client_id}] #{line}" end # Close all IO and forget this client - @epoll.del(io) rescue nil - clients.delete(io) + @io_selector.deregister(io) rescue nil buffers.delete(io) io.close rescue nil - if @epoll.size == 0 + if @io_selector.empty? Process.exit(0) end end end end + # If unlisten has been called, stop listening + if $unlisten + @io_selector.deregister(@server) + @server.close + # If there's nothing left to do, shut down the process + if @io_selector.empty? + Process.exit(0) + end + # Clear the request + $unlisten = false + end end end - def run_non_linux + def run + # Write PID to file if path specified + if ENV['PID_FILE'] + File.open(ENV['PID_FILE'], 'w') { |f| f.write(Process.pid.to_s + "\n") } + end + # If we have been spawned to replace an existing processm shut down the + # parent after listening. if ENV['SERVER_FD'] listen kill_parent else listen end - logger.info "Listening" - Thread.abort_on_exception = true - client_threads = [] - loop do - s = nil - begin - until s - l = select([@server], [@server], [@server], 0.5) - s = @server.accept if l - end - rescue IOError - STDERR.puts "Server socket was closed." - break - end - client_threads << Thread.new(s) do |io| - begin - if Postal.config.smtp_server.proxy_protocol - client = Client.new(nil) - if Postal.config.smtp_server.log_connect - logger.debug "[#{client.id}] \e[35m Connection opened from #{io.remote_address.ip_address}\e[0m" - end - else - client = Client.new(io.remote_address.ip_address) - if Postal.config.smtp_server.log_connect - logger.debug "[#{client.id}] \e[35m Connection opened from #{io.remote_address.ip_address}\e[0m" - end - client.log "\e[35m Client identified as #{io.remote_address.ip_address}\e[0m" - io.print("220 #{Postal.config.dns.smtp_server_hostname} ESMTP Postal/#{client.id}") - end - - loop do - if received_data = io.gets - if result = client.handle(received_data.chomp) - result = [result] unless result.is_a?(Array) - result.compact.each do |line| - client.log "\e[34m=> #{line.strip}\e[0m" - io.write(line.to_s + "\r\n") - io.flush - end - end - end - if client.start_tls? - client.start_tls = false - tcp_io = io - io = OpenSSL::SSL::SSLSocket.new(io, ssl_context) - io.sync_close = true - begin - io.accept - rescue OpenSSL::SSL::SSLError => e - logger.error "SSL Negotiation Failed: #{e.message}" - io.close rescue nil - tcp_io.close rescue nil - eof = true - end - end - if received_data.nil? || client.finished? - client.log "\e[35m Connection closed\e[0m" - io.close - break - end - end - rescue => e - if defined?(Raven) - Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)}) - end - logger.error "An error occurred while handling a client." - logger.error "#{e.class}: #{e.message}" - e.backtrace.each do |line| - logger.error line - end - # Close all IO - io.close rescue nil - ensure - client_threads.delete(Thread.current) - end - end - end - client_threads.each{ |t| t.join unless t == Thread.current } - end - - def run - if ENV['PID_FILE'] - File.open(ENV['PID_FILE'], 'w') { |f| f.write(Process.pid.to_s + "\n") } - end - if Postal.config.smtp_server&.evented - logger.info "Running epoll driven server for Linux host.." - run_linux - else - logger.info "Running thread based compatibility server for non-Linux host." - run_non_linux - end + run_event_loop end private