diff --git a/app/models/webhook_request.rb b/app/models/webhook_request.rb index c475155..7609b5a 100644 --- a/app/models/webhook_request.rb +++ b/app/models/webhook_request.rb @@ -28,8 +28,6 @@ class WebhookRequest < ApplicationRecord include HasUUID include HasLocking - RETRIES = { 1 => 2.minutes, 2 => 3.minutes, 3 => 6.minutes, 4 => 10.minutes, 5 => 15.minutes }.freeze - belongs_to :server belongs_to :webhook, optional: true @@ -38,48 +36,6 @@ class WebhookRequest < ApplicationRecord serialize :payload, Hash - def deliver - payload = { event: event, timestamp: created_at.to_f, payload: self.payload, uuid: uuid }.to_json - Postal.logger.tagged(event: event, url: url) do - Postal.logger.info "Sending webhook request" - result = Postal::HTTP.post(url, sign: true, json: payload, timeout: 5) - self.attempts += 1 - self.retry_after = RETRIES[self.attempts]&.from_now - server.message_db.webhooks.record( - event: event, - url: url, - webhook_id: webhook_id, - attempt: self.attempts, - timestamp: Time.now.to_f, - payload: self.payload.to_json, - uuid: uuid, - status_code: result[:code], - body: result[:body], - will_retry: (retry_after ? 0 : 1) - ) - - if result[:code] >= 200 && result[:code] < 300 - Postal.logger.info "Received #{result[:code]} status code. That's OK." - destroy! - webhook&.update_column(:last_used_at, Time.now) - true - else - Postal.logger.error "Received #{result[:code]} status code. That's not OK." - self.error = "Couldn't send to URL. Code received was #{result[:code]}" - if retry_after - Postal.logger.info "Will retry #{retry_after} (this was attempt #{self.attempts})" - self.locked_by = nil - self.locked_at = nil - save! - else - Postal.logger.info "Have tried #{self.attempts} times. Giving up." - destroy! - end - false - end - end - end - class << self def trigger(server, event, payload = {}) diff --git a/app/services/webhook_delivery_service.rb b/app/services/webhook_delivery_service.rb index 617054e..4aeb50c 100644 --- a/app/services/webhook_delivery_service.rb +++ b/app/services/webhook_delivery_service.rb @@ -2,18 +2,96 @@ class WebhookDeliveryService - def initialize(webhook_delivery:) - @webhook_delivery = webhook_delivery + RETRIES = { 1 => 2.minutes, 2 => 3.minutes, 3 => 6.minutes, 4 => 10.minutes, 5 => 15.minutes }.freeze + + def initialize(webhook_request:) + @webhook_request = webhook_request end - # TODO: move the logic from WebhookDelivery#deliver in to this service. - # def call - if @webhook_delivery.deliver - log "Succesfully delivered" - else - log "Delivery failed" + logger.tagged(webhook: @webhook_request.webhook_id, webhook_request: @webhook_request.id) do + generate_payload + send_request + record_attempt + appreciate_http_result + update_webhook_request end end + def success? + @success == true + end + + private + + def generate_payload + @payload = { + event: @webhook_request.event, + timestamp: @webhook_request.created_at.to_f, + payload: @webhook_request.payload, + uuid: @webhook_request.uuid + }.to_json + end + + def send_request + @http_result = Postal::HTTP.post(@webhook_request.url, + sign: true, + json: @payload, + timeout: 5) + + @success = (@http_result[:code] >= 200 && @http_result[:code] < 300) + end + + def record_attempt + @webhook_request.attempts += 1 + + if success? + @webhook_request.retry_after = nil + else + @webhook_request.retry_after = RETRIES[@webhook_request.attempts]&.from_now + end + + @attempt = @webhook_request.server.message_db.webhooks.record( + event: @webhook_request.event, + url: @webhook_request.url, + webhook_id: @webhook_request.webhook_id, + attempt: @webhook_request.attempts, + timestamp: Time.now.to_f, + payload: @webhook_request.payload.to_json, + uuid: @webhook_request.uuid, + status_code: @http_result[:code], + body: @http_result[:body], + will_retry: @webhook_request.retry_after.present? + ) + end + + def appreciate_http_result + if success? + logger.info "Received #{@http_result[:code]} status code. That's OK." + @webhook_request.destroy! + @webhook_request.webhook&.update_column(:last_used_at, Time.current) + return + end + + logger.error "Received #{@http_result[:code]} status code. That's not OK." + @webhook_request.error = "Couldn't send to URL. Code received was #{@http_result[:code]}" + end + + def update_webhook_request + if @webhook_request.retry_after + logger.info "Will retry #{@webhook_request.retry_after} (this was attempt #{@webhook_request.attempts})" + @webhook_request.locked_by = nil + @webhook_request.locked_at = nil + @webhook_request.save! + return + end + + logger.info "Have tried #{@webhook_request.attempts} times. Giving up." + @webhook_request.destroy! + end + + def logger + Postal.logger + end + end diff --git a/lib/postal/message_db/database.rb b/lib/postal/message_db/database.rb index 061e1e4..c5c3e38 100644 --- a/lib/postal/message_db/database.rb +++ b/lib/postal/message_db/database.rb @@ -325,7 +325,7 @@ module Postal result = connection.query(query, cast_booleans: true) time = Time.now.to_f - start_time logger.debug " \e[4;34mMessageDB Query (#{time.round(2)}s) \e[0m \e[33m#{query}\e[0m" - if time.positive? && query =~ /\A(SELECT|UPDATE|DELETE) / + if time > 0.05 && query =~ /\A(SELECT|UPDATE|DELETE) / id = Nifty::Utils::RandomString.generate(length: 6).upcase explain_result = ResultForExplainPrinter.new(connection.query("EXPLAIN #{query}")) logger.info " [#{id}] EXPLAIN #{query}" diff --git a/lib/postal/message_db/webhooks.rb b/lib/postal/message_db/webhooks.rb index cfe4bcd..97bbad5 100644 --- a/lib/postal/message_db/webhooks.rb +++ b/lib/postal/message_db/webhooks.rb @@ -12,7 +12,7 @@ module Postal @database.insert(:webhook_requests, attributes) end - def list(page) + def list(page = 1) result = @database.select_with_pagination(:webhook_requests, page, order: :timestamp, direction: "desc") result[:records] = result[:records].map { |i| Request.new(i) } result diff --git a/lib/worker/jobs/process_webhook_requests_job.rb b/lib/worker/jobs/process_webhook_requests_job.rb index c33de5f..53751a1 100644 --- a/lib/worker/jobs/process_webhook_requests_job.rb +++ b/lib/worker/jobs/process_webhook_requests_job.rb @@ -39,7 +39,8 @@ module Worker def process_requests @requests_to_process.each do |request| work_completed! - request.deliver + + WebhookDeliveryService.new(webhook_request: request).call end end diff --git a/spec/app/services/webhook_delivery_service_spec.rb b/spec/app/services/webhook_delivery_service_spec.rb new file mode 100644 index 0000000..3527e71 --- /dev/null +++ b/spec/app/services/webhook_delivery_service_spec.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe WebhookDeliveryService do + let(:server) { GLOBAL_SERVER } + let(:webhook) { create(:webhook, server: server) } + let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook) } + + subject(:service) { described_class.new(webhook_request: webhook_request) } + + let(:response_status) { 200 } + let(:response_body) { "OK" } + + before do + stub_request(:post, webhook.url).to_return(status: response_status, body: response_body) + end + + after do + server.message_db.provisioner.clean + end + + describe "#call" do + it "sends a request to the webhook's url" do + service.call + expect(WebMock).to have_requested(:post, webhook.url).with({ + body: { + event: webhook_request.event, + timestamp: webhook_request.created_at.to_f, + payload: webhook_request.payload, + uuid: webhook_request.uuid + }.to_json, + headers: { + "Content-Type" => "application/json", + "X-Postal-Signature" => /\A[a-z0-9\/+]+=*\z/i + } + }) + end + + context "when the endpoint returns a 200 OK" do + it "creates a webhook request for the server" do + service.call + expect(server.message_db.webhooks.list(1)[:total]).to eq(1) + webhook_request = server.message_db.webhooks.list(1)[:records].first + expect(webhook_request).to have_attributes( + event: webhook_request.event, + url: webhook_request.url, + status_code: 200, + body: "OK", + uuid: webhook_request.uuid, + will_retry?: false, + payload: webhook_request.payload, + attempt: 1, + timestamp: webhook_request.timestamp + ) + end + + it "deletes the webhook request" do + service.call + expect { webhook_request.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + it "updates the last used at time on the webhook" do + service.call + expect(webhook.reload.last_used_at).to be_within(1.second).of(Time.current) + end + end + + context "when the request returns a 500 Internal Server Error for the first time" do + let(:response_status) { 500 } + let(:response_body) { "internal server error!" } + + it "unlocks the webhook request if locked" do + expect { service.call }.to change { webhook_request.reload.locked? }.from(true).to(false) + end + + it "updates the retry time and attempt counter" do + service.call + expect(webhook_request.reload.attempts).to eq(1) + expect(webhook_request.retry_after).to be_within(1.second).of(2.minutes.from_now) + end + end + + context "when the request returns a 500 Internal Server Error for the second time" do + let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook, attempts: 1) } + let(:response_status) { 500 } + let(:response_body) { "internal server error!" } + + it "updates the retry time and attempt counter" do + service.call + expect(webhook_request.reload.attempts).to eq(2) + expect(webhook_request.retry_after).to be_within(1.second).of(3.minutes.from_now) + end + end + + context "when the request returns a 500 Internal Server Error for the sixth time" do + let(:webhook_request) { create(:webhook_request, :locked, webhook: webhook, attempts: 5) } + let(:response_status) { 500 } + let(:response_body) { "internal server error!" } + + it "creates a webhook request for the server" do + service.call + expect(server.message_db.webhooks.list(1)[:total]).to eq(1) + webhook_request = server.message_db.webhooks.list(1)[:records].first + expect(webhook_request).to have_attributes( + status_code: 500, + body: "internal server error!", + will_retry?: false, + attempt: 6 + ) + end + + it "deletes the webhook request" do + service.call + expect { webhook_request.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + end + end +end