Skip to content

Commit 6a745d4

Browse files
committed
feat: allow the database connection to be configured for async jobs
1 parent 5292406 commit 6a745d4

File tree

8 files changed

+103
-21
lines changed

8 files changed

+103
-21
lines changed

lib/pact_broker/webhooks/job.rb

+10-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ class Job
1111
include PactBroker::Logging
1212

1313
def perform data
14+
data.fetch(:database_connector).call do
15+
perform_with_connection(data)
16+
end
17+
end
18+
19+
private
20+
21+
attr_reader :triggered_webhook, :error_count
22+
23+
def perform_with_connection(data)
1424
@data = data
1525
@triggered_webhook = PactBroker::Webhooks::TriggeredWebhook.find(id: data[:triggered_webhook].id)
1626
@error_count = data[:error_count] || 0
@@ -26,10 +36,6 @@ def perform data
2636
end
2737
end
2838

29-
private
30-
31-
attr_reader :triggered_webhook, :error_count
32-
3339
def execution_options
3440
{
3541
success_log_message: "Successfully executed webhook",

lib/pact_broker/webhooks/service.rb

+8-6
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,20 @@ def self.run_later webhooks, pact, verification, event_name
124124
begin
125125
triggered_webhook = webhook_repository.create_triggered_webhook(trigger_uuid, webhook, pact, verification, RESOURCE_CREATION)
126126
logger.info "Scheduling job for #{webhook.description} with uuid #{webhook.uuid}"
127-
job_data = { triggered_webhook: triggered_webhook }
128-
schedule_webhook_job(job_data)
127+
job_data = {
128+
triggered_webhook: triggered_webhook,
129+
database_connector: job_database_connector
130+
}
131+
# Delay slightly to make sure the request transaction has finished before we execute the webhook
132+
Job.perform_in(5, job_data)
129133
rescue StandardError => e
130134
log_error e
131135
end
132136
end
133137
end
134138

135-
# This is a separate method so it can be overridden in the saas broker
136-
def self.schedule_webhook_job(job_data)
137-
# Delay slightly to make sure the request transaction has finished before we execute the webhook
138-
Job.perform_in(5, job_data)
139+
def self.job_database_connector
140+
Thread.current[:pact_broker_thread_data].database_connector
139141
end
140142

141143
def self.find_latest_triggered_webhooks_for_pact pact

lib/rack/pact_broker/database_transaction.rb

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'pact_broker/constants'
22
require 'sequel'
3+
require 'ostruct'
34

45
module Rack
56
module PactBroker
@@ -11,14 +12,22 @@ class DatabaseTransaction
1112
def initialize app, database_connection
1213
@app = app
1314
@database_connection = database_connection
15+
@default_database_connector = ->(&block) {
16+
database_connection.synchronize do
17+
block.call
18+
end
19+
}
1420
end
1521

1622
def call env
23+
set_database_connector
1724
if use_transaction? env
1825
call_with_transaction env
1926
else
2027
call_without_transaction env
2128
end
29+
ensure
30+
clear_database_connector
2231
end
2332

2433
def use_transaction? env
@@ -43,6 +52,19 @@ def call_with_transaction env
4352
def do_not_rollback? response
4453
response[1].delete(::PactBroker::DO_NOT_ROLLBACK)
4554
end
55+
56+
def set_database_connector
57+
Thread.current[:pact_broker_thread_data] ||= OpenStruct.new
58+
Thread.current[:pact_broker_thread_data].database_connector ||= @default_database_connector
59+
end
60+
61+
def clear_database_connector
62+
if thread_data = Thread.current[:pact_broker_thread_data]
63+
if thread_data.database_connector == @default_database_connector
64+
thread_data.database_connector = nil
65+
end
66+
end
67+
end
4668
end
4769
end
4870
end

spec/features/publish_verification_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
expect(JSON.parse(subject.body)).to include JSON.parse(verification_content)
4646
end
4747

48-
context "with a webhook configured" do
48+
context "with a webhook configured", job: true do
4949
before do
5050
td.create_webhook(
5151
method: 'POST',

spec/lib/pact_broker/webhooks/job_spec.rb

+7-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
module PactBroker
44
module Webhooks
55
describe Job do
6-
76
before do
87
PactBroker.configuration.webhook_retry_schedule = [10, 60, 120, 300, 600, 1200]
98
allow(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now).and_return(result)
@@ -16,8 +15,9 @@ module Webhooks
1615
let(:result) { instance_double("PactBroker::Domain::WebhookExecutionResult", success?: success)}
1716
let(:success) { true }
1817
let(:logger) { double('logger').as_null_object }
18+
let(:database_connector) { ->(&block) { block.call } }
1919

20-
subject { Job.new.perform(triggered_webhook: triggered_webhook) }
20+
subject { Job.new.perform(triggered_webhook: triggered_webhook, database_connector: database_connector) }
2121

2222
it "reloads the TriggeredWebhook object to make sure it has a fresh copy" do
2323
expect(PactBroker::Webhooks::TriggeredWebhook).to receive(:find).with(id: 1)
@@ -44,7 +44,7 @@ module Webhooks
4444
end
4545

4646
it "reschedules the job in 10 seconds" do
47-
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1})
47+
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector})
4848
subject
4949
end
5050

@@ -59,7 +59,7 @@ module Webhooks
5959
let(:success) { false }
6060

6161
it "reschedules the job in 10 seconds" do
62-
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1})
62+
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector})
6363
subject
6464
end
6565

@@ -84,10 +84,10 @@ module Webhooks
8484
allow(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now).and_raise("an error")
8585
end
8686

87-
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 1) }
87+
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector) }
8888

8989
it "reschedules the job in 60 seconds" do
90-
expect(Job).to receive(:perform_in).with(60, {triggered_webhook: triggered_webhook, error_count: 2})
90+
expect(Job).to receive(:perform_in).with(60, {triggered_webhook: triggered_webhook, error_count: 2, database_connector: database_connector})
9191
subject
9292
end
9393

@@ -101,7 +101,7 @@ module Webhooks
101101
context "when the job is not successful for the last time" do
102102
let(:success) { false }
103103

104-
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 6) }
104+
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 6, database_connector: database_connector) }
105105

106106
it "executes the job with an log message indicating that the webhook has failed" do
107107
expect(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now)

spec/lib/pact_broker/webhooks/service_spec.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ module Webhooks
7676
end
7777
end
7878

79-
context "when there is a scheduling error" do
79+
context "when there is a scheduling error", job: true do
8080
before do
8181
allow(Job).to receive(:perform_in).and_raise("an error")
8282
end
@@ -166,7 +166,7 @@ module Webhooks
166166
end
167167
end
168168

169-
describe ".execute_webhook_now integration test" do
169+
describe ".execute_webhook_now integration test", job: true do
170170
let(:td) { TestDataBuilder.new }
171171

172172
let!(:http_request) do
@@ -215,7 +215,7 @@ module Webhooks
215215
end
216216
end
217217

218-
describe ".trigger_webhooks integration test" do
218+
describe ".trigger_webhooks integration test", job: true do
219219
let!(:http_request) do
220220
stub_request(:get, "http://example.org").
221221
to_return(:status => 200)

spec/lib/rack/pact_broker/database_transaction_spec.rb

+40
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,46 @@ module PactBroker
5050
expect { subject }.to change { ::PactBroker::Domain::Pacticipant.count }.by(1)
5151
end
5252
end
53+
54+
describe "setting the database connector" do
55+
let(:api) { double('api', call: [200, {}, []]) }
56+
57+
it "sets a database connector for use in jobs scheduled by this request" do
58+
expect(api).to receive(:call) do | env |
59+
expect(Thread.current[:pact_broker_thread_data].database_connector).to_not be nil
60+
[200, {}, []]
61+
end
62+
63+
subject
64+
end
65+
66+
it "clears it after the request" do
67+
subject
68+
expect(Thread.current[:pact_broker_thread_data].database_connector).to be nil
69+
end
70+
71+
context "when other middleware sets the database connector" do
72+
before do
73+
Thread.current[:pact_broker_thread_data] = OpenStruct.new(database_connector: other_database_connector)
74+
end
75+
76+
let(:other_database_connector) { ->(&block) { block.call } }
77+
78+
it "does not override it" do
79+
expect(api).to receive(:call) do | env |
80+
expect(Thread.current[:pact_broker_thread_data].database_connector).to eq other_database_connector
81+
[200, {}, []]
82+
end
83+
84+
subject
85+
end
86+
87+
it "does not clear it after the request" do
88+
subject
89+
expect(Thread.current[:pact_broker_thread_data].database_connector).to_not be nil
90+
end
91+
end
92+
end
5393
end
5494
end
5595
end

spec/support/jobs.rb

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
require 'ostruct'
2+
3+
RSpec.configure do | config |
4+
config.before(:each, job: true) do
5+
Thread.current[:pact_broker_thread_data] = OpenStruct.new
6+
Thread.current[:pact_broker_thread_data].database_connector = -> (&block) { block.call }
7+
end
8+
9+
config.after(:each, job: true) do
10+
Thread.current[:pact_broker_thread_data] = nil
11+
end
12+
end

0 commit comments

Comments
 (0)