مراية لـ
https://github.com/postalserver/postal.git
تم المزامنة 2026-01-17 13:39:46 +00:00
replace epoll and threaded servers with single nio4r evented server
هذا الالتزام موجود في:
ملتزم من قبل
Adam Cooke
الأصل
594cff74a9
التزام
b90632b525
2
Gemfile
2
Gemfile
@@ -25,7 +25,7 @@ gem 'chronic'
|
|||||||
gem 'basic_ssl'
|
gem 'basic_ssl'
|
||||||
gem 'clockwork'
|
gem 'clockwork'
|
||||||
gem 'encrypto_signo'
|
gem 'encrypto_signo'
|
||||||
gem 'epoll', :require => nil
|
gem 'nio4r'
|
||||||
gem 'mongo'
|
gem 'mongo'
|
||||||
gem 'sentry-raven'
|
gem 'sentry-raven'
|
||||||
gem 'gelf'
|
gem 'gelf'
|
||||||
|
|||||||
@@ -92,7 +92,6 @@ GEM
|
|||||||
deep_merge (1.1.1)
|
deep_merge (1.1.1)
|
||||||
dynamic_form (1.1.4)
|
dynamic_form (1.1.4)
|
||||||
encrypto_signo (1.0.0)
|
encrypto_signo (1.0.0)
|
||||||
epoll (0.3.0)
|
|
||||||
erubis (2.7.0)
|
erubis (2.7.0)
|
||||||
execjs (2.7.0)
|
execjs (2.7.0)
|
||||||
faraday (0.9.2)
|
faraday (0.9.2)
|
||||||
@@ -226,7 +225,6 @@ DEPENDENCIES
|
|||||||
createsend (~> 4.0)
|
createsend (~> 4.0)
|
||||||
dynamic_form
|
dynamic_form
|
||||||
encrypto_signo
|
encrypto_signo
|
||||||
epoll
|
|
||||||
foreman
|
foreman
|
||||||
gelf
|
gelf
|
||||||
haml
|
haml
|
||||||
@@ -241,6 +239,7 @@ DEPENDENCIES
|
|||||||
mysql2 (>= 0.3.18, < 0.5)
|
mysql2 (>= 0.3.18, < 0.5)
|
||||||
nifty-utils
|
nifty-utils
|
||||||
nilify_blanks
|
nilify_blanks
|
||||||
|
nio4r
|
||||||
puma (~> 3.0)
|
puma (~> 3.0)
|
||||||
rails (= 5.0.2)
|
rails (= 5.0.2)
|
||||||
sass-rails (~> 5.0)
|
sass-rails (~> 5.0)
|
||||||
@@ -251,4 +250,4 @@ DEPENDENCIES
|
|||||||
web-console
|
web-console
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
1.14.5
|
1.14.6
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
require 'ipaddr'
|
require 'ipaddr'
|
||||||
require 'epoll' if RUBY_PLATFORM.include?('linux')
|
require 'nio'
|
||||||
|
|
||||||
module Postal
|
module Postal
|
||||||
module SMTPServer
|
module SMTPServer
|
||||||
@@ -62,59 +62,58 @@ module Postal
|
|||||||
@server.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 5)
|
@server.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 5)
|
||||||
end
|
end
|
||||||
ENV['SERVER_FD'] = @server.to_i.to_s
|
ENV['SERVER_FD'] = @server.to_i.to_s
|
||||||
|
logger.info "Listening"
|
||||||
end
|
end
|
||||||
|
|
||||||
def unlisten
|
def unlisten
|
||||||
if @epoll
|
# Instruct the nio loop to unlisten and wake it
|
||||||
@epoll.del(@server)
|
$unlisten = true
|
||||||
if @epoll.size == 0
|
@io_selector.wakeup
|
||||||
Process.exit(0)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@server.close
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def kill_parent
|
def kill_parent
|
||||||
Process.kill('TERM', Process.ppid)
|
Process.kill('TERM', Process.ppid)
|
||||||
end
|
end
|
||||||
|
|
||||||
def run_linux
|
def run_event_loop
|
||||||
if ENV['SERVER_FD']
|
# Set up an instance of nio4r to monitor for connections and data
|
||||||
listen
|
@io_selector = NIO::Selector.new
|
||||||
kill_parent
|
# Register the SMTP listener
|
||||||
else
|
@io_selector.register(@server, :r)
|
||||||
listen
|
# Create a hash to contain a buffer for each client.
|
||||||
end
|
|
||||||
@epoll = Epoll.create
|
|
||||||
logger.info "Listening"
|
|
||||||
@epoll.add(@server, Epoll::IN)
|
|
||||||
buffers = Hash.new { |h, k| h[k] = String.new.force_encoding('BINARY') }
|
buffers = Hash.new { |h, k| h[k] = String.new.force_encoding('BINARY') }
|
||||||
clients = {}
|
|
||||||
loop do
|
loop do
|
||||||
evlist = @epoll.wait
|
# Wait for an event to occur
|
||||||
evlist.each do |ev|
|
@io_selector.select do |monitor|
|
||||||
io = ev.data
|
# Get the IO from the nio monitor
|
||||||
|
io = monitor.io
|
||||||
|
# Is this event an incoming connection?
|
||||||
if io.is_a?(TCPServer)
|
if io.is_a?(TCPServer)
|
||||||
begin
|
begin
|
||||||
|
# Accept the connection
|
||||||
new_io = io.accept
|
new_io = io.accept
|
||||||
|
|
||||||
if Postal.config.smtp_server.proxy_protocol
|
if Postal.config.smtp_server.proxy_protocol
|
||||||
|
# If we are using the haproxy proxy protocol, we will be sent the
|
||||||
|
# client's IP later. Delay the welcome process.
|
||||||
client = Client.new(nil)
|
client = Client.new(nil)
|
||||||
if Postal.config.smtp_server.log_connect
|
if Postal.config.smtp_server.log_connect
|
||||||
logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m"
|
logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m"
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
|
# We're not using the proxy protocol so we already know the client's IP
|
||||||
client = Client.new(new_io.remote_address.ip_address)
|
client = Client.new(new_io.remote_address.ip_address)
|
||||||
if Postal.config.smtp_server.log_connect
|
if Postal.config.smtp_server.log_connect
|
||||||
logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m"
|
logger.debug "[#{client.id}] \e[35m Connection opened from #{new_io.remote_address.ip_address}\e[0m"
|
||||||
end
|
end
|
||||||
|
# We know who the client is, welcome them.
|
||||||
client.log "\e[35m Client identified as #{new_io.remote_address.ip_address}\e[0m"
|
client.log "\e[35m Client identified as #{new_io.remote_address.ip_address}\e[0m"
|
||||||
new_io.print("220 #{Postal.config.dns.smtp_server_hostname} ESMTP Postal/#{client.id}")
|
new_io.print("220 #{Postal.config.dns.smtp_server_hostname} ESMTP Postal/#{client.id}")
|
||||||
end
|
end
|
||||||
|
# Register the client and its socket with nio4r
|
||||||
clients[new_io] = client
|
monitor = @io_selector.register(new_io, :r)
|
||||||
@epoll.add(new_io, Epoll::IN|Epoll::PRI|Epoll::HUP)
|
monitor.value = client
|
||||||
rescue => e
|
rescue => e
|
||||||
|
# If something goes wrong, log as appropriate and disconnect the client
|
||||||
if defined?(Raven)
|
if defined?(Raven)
|
||||||
Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)})
|
Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)})
|
||||||
end
|
end
|
||||||
@@ -126,10 +125,15 @@ module Postal
|
|||||||
new_io.close rescue nil
|
new_io.close rescue nil
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
|
# This event is not an incoming connection so it must be data from a client
|
||||||
begin
|
begin
|
||||||
client = clients[io]
|
# Get the client from the nio monitor
|
||||||
|
client = monitor.value
|
||||||
|
# For now we assume the connection isn't closed
|
||||||
eof = false
|
eof = false
|
||||||
begin
|
begin
|
||||||
|
# Read 10kiB of data at a time from the socket.
|
||||||
|
# There is an extra step for SSL sockets
|
||||||
case io
|
case io
|
||||||
when OpenSSL::SSL::SSLSocket
|
when OpenSSL::SSL::SSLSocket
|
||||||
buffers[io] << io.readpartial(10240)
|
buffers[io] << io.readpartial(10240)
|
||||||
@@ -143,13 +147,18 @@ module Postal
|
|||||||
# Client went away
|
# Client went away
|
||||||
eof = true
|
eof = true
|
||||||
end
|
end
|
||||||
|
# We line buffer, so look to see if we have received a newline
|
||||||
|
# and keep doing so until all buffered lines have been processed.
|
||||||
while buffers[io].index("\n")
|
while buffers[io].index("\n")
|
||||||
|
# Extract the line
|
||||||
if buffers[io].index("\r\n")
|
if buffers[io].index("\r\n")
|
||||||
line, buffers[io] = buffers[io].split("\r\n", 2)
|
line, buffers[io] = buffers[io].split("\r\n", 2)
|
||||||
else
|
else
|
||||||
line, buffers[io] = buffers[io].split("\n", 2)
|
line, buffers[io] = buffers[io].split("\n", 2)
|
||||||
end
|
end
|
||||||
|
# Send the received line to the client object for processing
|
||||||
result = client.handle(line)
|
result = client.handle(line)
|
||||||
|
# If the client object returned some data, write it back to the client
|
||||||
unless result.nil?
|
unless result.nil?
|
||||||
result = [result] unless result.is_a?(Array)
|
result = [result] unless result.is_a?(Array)
|
||||||
result.compact.each do |line|
|
result.compact.each do |line|
|
||||||
@@ -164,17 +173,23 @@ module Postal
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
# If the client requested we start TLS, do it now
|
||||||
if !eof && client.start_tls?
|
if !eof && client.start_tls?
|
||||||
|
# Clear the request
|
||||||
client.start_tls = false
|
client.start_tls = false
|
||||||
@epoll.del(io)
|
# Deregister the unencrypted IO
|
||||||
clients.delete(io)
|
@io_selector.deregister(io)
|
||||||
buffers.delete(io)
|
buffers.delete(io)
|
||||||
|
# Prepare TLS on the socket
|
||||||
tcp_io = io
|
tcp_io = io
|
||||||
io = OpenSSL::SSL::SSLSocket.new(io, ssl_context)
|
io = OpenSSL::SSL::SSLSocket.new(io, ssl_context)
|
||||||
@epoll.add(io, Epoll::IN)
|
# Register the new TLS socket with nio
|
||||||
clients[io] = client
|
monitor = @io_selector.register(io, :r)
|
||||||
|
monitor.value = client
|
||||||
|
# Close the underlying IO when the TLS socket is closed
|
||||||
io.sync_close = true
|
io.sync_close = true
|
||||||
begin
|
begin
|
||||||
|
# Start TLS negotiation
|
||||||
io.accept
|
io.accept
|
||||||
rescue OpenSSL::SSL::SSLError => e
|
rescue OpenSSL::SSL::SSLError => e
|
||||||
client.log "SSL Negotiation Failed: #{e.message}"
|
client.log "SSL Negotiation Failed: #{e.message}"
|
||||||
@@ -182,17 +197,20 @@ module Postal
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Has the clint requested we close the connection?
|
||||||
if client.finished? || eof
|
if client.finished? || eof
|
||||||
client.log "\e[35m Connection closed\e[0m"
|
client.log "\e[35m Connection closed\e[0m"
|
||||||
@epoll.del(io)
|
# Deregister the socket and close it
|
||||||
clients.delete(io)
|
@io_selector.deregister(io)
|
||||||
buffers.delete(io)
|
buffers.delete(io)
|
||||||
io.close
|
io.close
|
||||||
if @epoll.size == 0
|
# If we have no more clients or listeners left, exit the process
|
||||||
|
if @io_selector.empty?
|
||||||
Process.exit(0)
|
Process.exit(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
|
# Something went wrong, log as appropriate
|
||||||
client_id = client ? client.id : '------'
|
client_id = client ? client.id : '------'
|
||||||
if defined?(Raven)
|
if defined?(Raven)
|
||||||
Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)})
|
Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)})
|
||||||
@@ -203,117 +221,43 @@ module Postal
|
|||||||
logger.error "[#{client_id}] #{line}"
|
logger.error "[#{client_id}] #{line}"
|
||||||
end
|
end
|
||||||
# Close all IO and forget this client
|
# Close all IO and forget this client
|
||||||
@epoll.del(io) rescue nil
|
@io_selector.deregister(io) rescue nil
|
||||||
clients.delete(io)
|
|
||||||
buffers.delete(io)
|
buffers.delete(io)
|
||||||
io.close rescue nil
|
io.close rescue nil
|
||||||
if @epoll.size == 0
|
if @io_selector.empty?
|
||||||
Process.exit(0)
|
Process.exit(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
# If unlisten has been called, stop listening
|
||||||
|
if $unlisten
|
||||||
|
@io_selector.deregister(@server)
|
||||||
|
@server.close
|
||||||
|
# If there's nothing left to do, shut down the process
|
||||||
|
if @io_selector.empty?
|
||||||
|
Process.exit(0)
|
||||||
|
end
|
||||||
|
# Clear the request
|
||||||
|
$unlisten = false
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def run_non_linux
|
def run
|
||||||
|
# Write PID to file if path specified
|
||||||
|
if ENV['PID_FILE']
|
||||||
|
File.open(ENV['PID_FILE'], 'w') { |f| f.write(Process.pid.to_s + "\n") }
|
||||||
|
end
|
||||||
|
# If we have been spawned to replace an existing processm shut down the
|
||||||
|
# parent after listening.
|
||||||
if ENV['SERVER_FD']
|
if ENV['SERVER_FD']
|
||||||
listen
|
listen
|
||||||
kill_parent
|
kill_parent
|
||||||
else
|
else
|
||||||
listen
|
listen
|
||||||
end
|
end
|
||||||
logger.info "Listening"
|
run_event_loop
|
||||||
Thread.abort_on_exception = true
|
|
||||||
client_threads = []
|
|
||||||
loop do
|
|
||||||
s = nil
|
|
||||||
begin
|
|
||||||
until s
|
|
||||||
l = select([@server], [@server], [@server], 0.5)
|
|
||||||
s = @server.accept if l
|
|
||||||
end
|
|
||||||
rescue IOError
|
|
||||||
STDERR.puts "Server socket was closed."
|
|
||||||
break
|
|
||||||
end
|
|
||||||
client_threads << Thread.new(s) do |io|
|
|
||||||
begin
|
|
||||||
if Postal.config.smtp_server.proxy_protocol
|
|
||||||
client = Client.new(nil)
|
|
||||||
if Postal.config.smtp_server.log_connect
|
|
||||||
logger.debug "[#{client.id}] \e[35m Connection opened from #{io.remote_address.ip_address}\e[0m"
|
|
||||||
end
|
|
||||||
else
|
|
||||||
client = Client.new(io.remote_address.ip_address)
|
|
||||||
if Postal.config.smtp_server.log_connect
|
|
||||||
logger.debug "[#{client.id}] \e[35m Connection opened from #{io.remote_address.ip_address}\e[0m"
|
|
||||||
end
|
|
||||||
client.log "\e[35m Client identified as #{io.remote_address.ip_address}\e[0m"
|
|
||||||
io.print("220 #{Postal.config.dns.smtp_server_hostname} ESMTP Postal/#{client.id}")
|
|
||||||
end
|
|
||||||
|
|
||||||
loop do
|
|
||||||
if received_data = io.gets
|
|
||||||
if result = client.handle(received_data.chomp)
|
|
||||||
result = [result] unless result.is_a?(Array)
|
|
||||||
result.compact.each do |line|
|
|
||||||
client.log "\e[34m=> #{line.strip}\e[0m"
|
|
||||||
io.write(line.to_s + "\r\n")
|
|
||||||
io.flush
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
if client.start_tls?
|
|
||||||
client.start_tls = false
|
|
||||||
tcp_io = io
|
|
||||||
io = OpenSSL::SSL::SSLSocket.new(io, ssl_context)
|
|
||||||
io.sync_close = true
|
|
||||||
begin
|
|
||||||
io.accept
|
|
||||||
rescue OpenSSL::SSL::SSLError => e
|
|
||||||
logger.error "SSL Negotiation Failed: #{e.message}"
|
|
||||||
io.close rescue nil
|
|
||||||
tcp_io.close rescue nil
|
|
||||||
eof = true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
if received_data.nil? || client.finished?
|
|
||||||
client.log "\e[35m Connection closed\e[0m"
|
|
||||||
io.close
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
rescue => e
|
|
||||||
if defined?(Raven)
|
|
||||||
Raven.capture_exception(e, :extra => {:log_id => (client.id rescue nil)})
|
|
||||||
end
|
|
||||||
logger.error "An error occurred while handling a client."
|
|
||||||
logger.error "#{e.class}: #{e.message}"
|
|
||||||
e.backtrace.each do |line|
|
|
||||||
logger.error line
|
|
||||||
end
|
|
||||||
# Close all IO
|
|
||||||
io.close rescue nil
|
|
||||||
ensure
|
|
||||||
client_threads.delete(Thread.current)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
client_threads.each{ |t| t.join unless t == Thread.current }
|
|
||||||
end
|
|
||||||
|
|
||||||
def run
|
|
||||||
if ENV['PID_FILE']
|
|
||||||
File.open(ENV['PID_FILE'], 'w') { |f| f.write(Process.pid.to_s + "\n") }
|
|
||||||
end
|
|
||||||
if Postal.config.smtp_server&.evented
|
|
||||||
logger.info "Running epoll driven server for Linux host.."
|
|
||||||
run_linux
|
|
||||||
else
|
|
||||||
logger.info "Running thread based compatibility server for non-Linux host."
|
|
||||||
run_non_linux
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|||||||
المرجع في مشكلة جديدة
حظر مستخدم