مراية لـ
https://github.com/postalserver/postal.git
تم المزامنة 2025-12-01 05:43:04 +00:00
refactor: refactor webhook deliveries
هذا الالتزام موجود في:
@@ -28,8 +28,6 @@ class WebhookRequest < ApplicationRecord
|
|||||||
include HasUUID
|
include HasUUID
|
||||||
include HasLocking
|
include HasLocking
|
||||||
|
|
||||||
RETRIES = { 1 => 2.minutes, 2 => 3.minutes, 3 => 6.minutes, 4 => 10.minutes, 5 => 15.minutes }.freeze
|
|
||||||
|
|
||||||
belongs_to :server
|
belongs_to :server
|
||||||
belongs_to :webhook, optional: true
|
belongs_to :webhook, optional: true
|
||||||
|
|
||||||
@@ -38,48 +36,6 @@ class WebhookRequest < ApplicationRecord
|
|||||||
|
|
||||||
serialize :payload, Hash
|
serialize :payload, Hash
|
||||||
|
|
||||||
def deliver
|
|
||||||
payload = { event: event, timestamp: created_at.to_f, payload: self.payload, uuid: uuid }.to_json
|
|
||||||
Postal.logger.tagged(event: event, url: url) do
|
|
||||||
Postal.logger.info "Sending webhook request"
|
|
||||||
result = Postal::HTTP.post(url, sign: true, json: payload, timeout: 5)
|
|
||||||
self.attempts += 1
|
|
||||||
self.retry_after = RETRIES[self.attempts]&.from_now
|
|
||||||
server.message_db.webhooks.record(
|
|
||||||
event: event,
|
|
||||||
url: url,
|
|
||||||
webhook_id: webhook_id,
|
|
||||||
attempt: self.attempts,
|
|
||||||
timestamp: Time.now.to_f,
|
|
||||||
payload: self.payload.to_json,
|
|
||||||
uuid: uuid,
|
|
||||||
status_code: result[:code],
|
|
||||||
body: result[:body],
|
|
||||||
will_retry: (retry_after ? 0 : 1)
|
|
||||||
)
|
|
||||||
|
|
||||||
if result[:code] >= 200 && result[:code] < 300
|
|
||||||
Postal.logger.info "Received #{result[:code]} status code. That's OK."
|
|
||||||
destroy!
|
|
||||||
webhook&.update_column(:last_used_at, Time.now)
|
|
||||||
true
|
|
||||||
else
|
|
||||||
Postal.logger.error "Received #{result[:code]} status code. That's not OK."
|
|
||||||
self.error = "Couldn't send to URL. Code received was #{result[:code]}"
|
|
||||||
if retry_after
|
|
||||||
Postal.logger.info "Will retry #{retry_after} (this was attempt #{self.attempts})"
|
|
||||||
self.locked_by = nil
|
|
||||||
self.locked_at = nil
|
|
||||||
save!
|
|
||||||
else
|
|
||||||
Postal.logger.info "Have tried #{self.attempts} times. Giving up."
|
|
||||||
destroy!
|
|
||||||
end
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
|
|
||||||
def trigger(server, event, payload = {})
|
def trigger(server, event, payload = {})
|
||||||
|
|||||||
@@ -2,18 +2,96 @@
|
|||||||
|
|
||||||
class WebhookDeliveryService
|
class WebhookDeliveryService
|
||||||
|
|
||||||
def initialize(webhook_delivery:)
|
RETRIES = { 1 => 2.minutes, 2 => 3.minutes, 3 => 6.minutes, 4 => 10.minutes, 5 => 15.minutes }.freeze
|
||||||
@webhook_delivery = webhook_delivery
|
|
||||||
|
def initialize(webhook_request:)
|
||||||
|
@webhook_request = webhook_request
|
||||||
end
|
end
|
||||||
|
|
||||||
# TODO: move the logic from WebhookDelivery#deliver in to this service.
|
|
||||||
#
|
|
||||||
def call
|
def call
|
||||||
if @webhook_delivery.deliver
|
logger.tagged(webhook: @webhook_request.webhook_id, webhook_request: @webhook_request.id) do
|
||||||
log "Succesfully delivered"
|
generate_payload
|
||||||
else
|
send_request
|
||||||
log "Delivery failed"
|
record_attempt
|
||||||
|
appreciate_http_result
|
||||||
|
update_webhook_request
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def success?
|
||||||
|
@success == true
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def generate_payload
|
||||||
|
@payload = {
|
||||||
|
event: @webhook_request.event,
|
||||||
|
timestamp: @webhook_request.created_at.to_f,
|
||||||
|
payload: @webhook_request.payload,
|
||||||
|
uuid: @webhook_request.uuid
|
||||||
|
}.to_json
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_request
|
||||||
|
@http_result = Postal::HTTP.post(@webhook_request.url,
|
||||||
|
sign: true,
|
||||||
|
json: @payload,
|
||||||
|
timeout: 5)
|
||||||
|
|
||||||
|
@success = (@http_result[:code] >= 200 && @http_result[:code] < 300)
|
||||||
|
end
|
||||||
|
|
||||||
|
def record_attempt
|
||||||
|
@webhook_request.attempts += 1
|
||||||
|
|
||||||
|
if success?
|
||||||
|
@webhook_request.retry_after = nil
|
||||||
|
else
|
||||||
|
@webhook_request.retry_after = RETRIES[@webhook_request.attempts]&.from_now
|
||||||
|
end
|
||||||
|
|
||||||
|
@attempt = @webhook_request.server.message_db.webhooks.record(
|
||||||
|
event: @webhook_request.event,
|
||||||
|
url: @webhook_request.url,
|
||||||
|
webhook_id: @webhook_request.webhook_id,
|
||||||
|
attempt: @webhook_request.attempts,
|
||||||
|
timestamp: Time.now.to_f,
|
||||||
|
payload: @webhook_request.payload.to_json,
|
||||||
|
uuid: @webhook_request.uuid,
|
||||||
|
status_code: @http_result[:code],
|
||||||
|
body: @http_result[:body],
|
||||||
|
will_retry: @webhook_request.retry_after.present?
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def appreciate_http_result
|
||||||
|
if success?
|
||||||
|
logger.info "Received #{@http_result[:code]} status code. That's OK."
|
||||||
|
@webhook_request.destroy!
|
||||||
|
@webhook_request.webhook&.update_column(:last_used_at, Time.current)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
logger.error "Received #{@http_result[:code]} status code. That's not OK."
|
||||||
|
@webhook_request.error = "Couldn't send to URL. Code received was #{@http_result[:code]}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def update_webhook_request
|
||||||
|
if @webhook_request.retry_after
|
||||||
|
logger.info "Will retry #{@webhook_request.retry_after} (this was attempt #{@webhook_request.attempts})"
|
||||||
|
@webhook_request.locked_by = nil
|
||||||
|
@webhook_request.locked_at = nil
|
||||||
|
@webhook_request.save!
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
logger.info "Have tried #{@webhook_request.attempts} times. Giving up."
|
||||||
|
@webhook_request.destroy!
|
||||||
|
end
|
||||||
|
|
||||||
|
def logger
|
||||||
|
Postal.logger
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -325,7 +325,7 @@ module Postal
|
|||||||
result = connection.query(query, cast_booleans: true)
|
result = connection.query(query, cast_booleans: true)
|
||||||
time = Time.now.to_f - start_time
|
time = Time.now.to_f - start_time
|
||||||
logger.debug " \e[4;34mMessageDB Query (#{time.round(2)}s) \e[0m \e[33m#{query}\e[0m"
|
logger.debug " \e[4;34mMessageDB Query (#{time.round(2)}s) \e[0m \e[33m#{query}\e[0m"
|
||||||
if time.positive? && query =~ /\A(SELECT|UPDATE|DELETE) /
|
if time > 0.05 && query =~ /\A(SELECT|UPDATE|DELETE) /
|
||||||
id = Nifty::Utils::RandomString.generate(length: 6).upcase
|
id = Nifty::Utils::RandomString.generate(length: 6).upcase
|
||||||
explain_result = ResultForExplainPrinter.new(connection.query("EXPLAIN #{query}"))
|
explain_result = ResultForExplainPrinter.new(connection.query("EXPLAIN #{query}"))
|
||||||
logger.info " [#{id}] EXPLAIN #{query}"
|
logger.info " [#{id}] EXPLAIN #{query}"
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ module Postal
|
|||||||
@database.insert(:webhook_requests, attributes)
|
@database.insert(:webhook_requests, attributes)
|
||||||
end
|
end
|
||||||
|
|
||||||
def list(page)
|
def list(page = 1)
|
||||||
result = @database.select_with_pagination(:webhook_requests, page, order: :timestamp, direction: "desc")
|
result = @database.select_with_pagination(:webhook_requests, page, order: :timestamp, direction: "desc")
|
||||||
result[:records] = result[:records].map { |i| Request.new(i) }
|
result[:records] = result[:records].map { |i| Request.new(i) }
|
||||||
result
|
result
|
||||||
|
|||||||
@@ -39,7 +39,8 @@ module Worker
|
|||||||
def process_requests
|
def process_requests
|
||||||
@requests_to_process.each do |request|
|
@requests_to_process.each do |request|
|
||||||
work_completed!
|
work_completed!
|
||||||
request.deliver
|
|
||||||
|
WebhookDeliveryService.new(webhook_request: request).call
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
119
spec/app/services/webhook_delivery_service_spec.rb
Normal file
119
spec/app/services/webhook_delivery_service_spec.rb
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require "rails_helper"
|
||||||
|
|
||||||
|
RSpec.describe WebhookDeliveryService do
|
||||||
|
let(:server) { GLOBAL_SERVER }
|
||||||
|
let(:webhook) { create(:webhook, server: server) }
|
||||||
|
let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook) }
|
||||||
|
|
||||||
|
subject(:service) { described_class.new(webhook_request: webhook_request) }
|
||||||
|
|
||||||
|
let(:response_status) { 200 }
|
||||||
|
let(:response_body) { "OK" }
|
||||||
|
|
||||||
|
before do
|
||||||
|
stub_request(:post, webhook.url).to_return(status: response_status, body: response_body)
|
||||||
|
end
|
||||||
|
|
||||||
|
after do
|
||||||
|
server.message_db.provisioner.clean
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "#call" do
|
||||||
|
it "sends a request to the webhook's url" do
|
||||||
|
service.call
|
||||||
|
expect(WebMock).to have_requested(:post, webhook.url).with({
|
||||||
|
body: {
|
||||||
|
event: webhook_request.event,
|
||||||
|
timestamp: webhook_request.created_at.to_f,
|
||||||
|
payload: webhook_request.payload,
|
||||||
|
uuid: webhook_request.uuid
|
||||||
|
}.to_json,
|
||||||
|
headers: {
|
||||||
|
"Content-Type" => "application/json",
|
||||||
|
"X-Postal-Signature" => /\A[a-z0-9\/+]+=*\z/i
|
||||||
|
}
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when the endpoint returns a 200 OK" do
|
||||||
|
it "creates a webhook request for the server" do
|
||||||
|
service.call
|
||||||
|
expect(server.message_db.webhooks.list(1)[:total]).to eq(1)
|
||||||
|
webhook_request = server.message_db.webhooks.list(1)[:records].first
|
||||||
|
expect(webhook_request).to have_attributes(
|
||||||
|
event: webhook_request.event,
|
||||||
|
url: webhook_request.url,
|
||||||
|
status_code: 200,
|
||||||
|
body: "OK",
|
||||||
|
uuid: webhook_request.uuid,
|
||||||
|
will_retry?: false,
|
||||||
|
payload: webhook_request.payload,
|
||||||
|
attempt: 1,
|
||||||
|
timestamp: webhook_request.timestamp
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "deletes the webhook request" do
|
||||||
|
service.call
|
||||||
|
expect { webhook_request.reload }.to raise_error(ActiveRecord::RecordNotFound)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "updates the last used at time on the webhook" do
|
||||||
|
service.call
|
||||||
|
expect(webhook.reload.last_used_at).to be_within(1.second).of(Time.current)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when the request returns a 500 Internal Server Error for the first time" do
|
||||||
|
let(:response_status) { 500 }
|
||||||
|
let(:response_body) { "internal server error!" }
|
||||||
|
|
||||||
|
it "unlocks the webhook request if locked" do
|
||||||
|
expect { service.call }.to change { webhook_request.reload.locked? }.from(true).to(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "updates the retry time and attempt counter" do
|
||||||
|
service.call
|
||||||
|
expect(webhook_request.reload.attempts).to eq(1)
|
||||||
|
expect(webhook_request.retry_after).to be_within(1.second).of(2.minutes.from_now)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when the request returns a 500 Internal Server Error for the second time" do
|
||||||
|
let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook, attempts: 1) }
|
||||||
|
let(:response_status) { 500 }
|
||||||
|
let(:response_body) { "internal server error!" }
|
||||||
|
|
||||||
|
it "updates the retry time and attempt counter" do
|
||||||
|
service.call
|
||||||
|
expect(webhook_request.reload.attempts).to eq(2)
|
||||||
|
expect(webhook_request.retry_after).to be_within(1.second).of(3.minutes.from_now)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when the request returns a 500 Internal Server Error for the sixth time" do
|
||||||
|
let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook, attempts: 5) }
|
||||||
|
let(:response_status) { 500 }
|
||||||
|
let(:response_body) { "internal server error!" }
|
||||||
|
|
||||||
|
it "creates a webhook request for the server" do
|
||||||
|
service.call
|
||||||
|
expect(server.message_db.webhooks.list(1)[:total]).to eq(1)
|
||||||
|
webhook_request = server.message_db.webhooks.list(1)[:records].first
|
||||||
|
expect(webhook_request).to have_attributes(
|
||||||
|
status_code: 500,
|
||||||
|
body: "internal server error!",
|
||||||
|
will_retry?: false,
|
||||||
|
attempt: 6
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "deletes the webhook request" do
|
||||||
|
service.call
|
||||||
|
expect { webhook_request.reload }.to raise_error(ActiveRecord::RecordNotFound)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
المرجع في مشكلة جديدة
حظر مستخدم