Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve transaction handling #538

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,57 @@ extension PostgresConnection {
throw error // rethrow with more metadata
}
}

/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
///
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
/// changes made within the closure.
///
/// - Parameters:
/// - logger: The `Logger` to log into for the transaction.
/// - file: The file, the transaction was started in. Used for better error reporting.
/// - line: The line, the transaction was started in. Used for better error reporting.
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
/// The connection must stay in the transaction mode. Otherwise this method will throw!
/// - Returns: The closure's return value.
public func withTransaction<Result>(
logger: Logger,
file: String = #file,
line: Int = #line,
isolation: isolated (any Actor)? = #isolation,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be guarded to only apply in Swift 6, need a variant that doesn't have it for Swift 5. Same in PostgresClient.

_ process: (PostgresConnection) async throws -> sending Result
) async throws -> sending Result {
do {
try await self.query("BEGIN;", logger: logger)
} catch {
throw PostgresTransactionError(file: file, line: line, beginError: error)
}

var closureHasFinished: Bool = false
do {
let value = try await process(self)
closureHasFinished = true
try await self.query("COMMIT;", logger: logger)
return value
} catch {
var transactionError = PostgresTransactionError(file: file, line: line)
if !closureHasFinished {
transactionError.closureError = error
do {
try await self.query("ROLLBACK;", logger: logger)
} catch {
transactionError.rollbackError = error
}
} else {
transactionError.commitError = error
}

throw transactionError
}
}
}

// MARK: EventLoopFuture interface
Expand Down
21 changes: 21 additions & 0 deletions Sources/PostgresNIO/New/PostgresTransactionError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/// A wrapper around the errors that can occur during a transaction.
public struct PostgresTransactionError: Error {

/// The file in which the transaction was started
public var file: String
/// The line in which the transaction was started
public var line: Int

/// The error thrown when running the `BEGIN` query
public var beginError: Error?
/// The error thrown in the transaction closure
public var closureError: Error?

/// The error thrown while rolling the transaction back. If the ``closureError`` is set,
/// but the ``rollbackError`` is empty, the rollback was successful. If the ``rollbackError``
/// is set, the rollback failed.
public var rollbackError: Error?

/// The error thrown while commiting the transaction.
public var commitError: Error?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have a public init for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for now I think.

}
52 changes: 37 additions & 15 deletions Sources/PostgresNIO/Pool/PostgresClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -300,33 +300,55 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
/// - Parameter closure: A closure that uses the passed `PostgresConnection`. The closure **must not** capture
/// the provided `PostgresConnection`.
/// - Returns: The closure's return value.
@_disfavoredOverload
public func withConnection<Result>(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result {
let connection = try await self.leaseConnection()

defer { self.pool.releaseConnection(connection) }

return try await closure(connection)
}

/// Lease a connection for the provided `closure`'s lifetime.
/// A transation starts with call to withConnection
/// A transaction should end with a call to COMMIT or ROLLBACK
/// COMMIT is called upon successful completion and ROLLBACK is called should any steps fail
///
/// - Parameter closure: A closure that uses the passed `PostgresConnection`. The closure **must not** capture
/// the provided `PostgresConnection`.
/// - Returns: The closure's return value.
public func withTransaction<Result>(_ process: (PostgresConnection) async throws -> Result) async throws -> Result {
try await withConnection { connection in
try await connection.query("BEGIN;", logger: self.backgroundLogger)
do {
let value = try await process(connection)
try await connection.query("COMMIT;", logger: self.backgroundLogger)
return value
} catch {
try await connection.query("ROLLBACK;", logger: self.backgroundLogger)
throw error
}
public func withConnection<Result>(
isolation: isolated (any Actor)? = #isolation,
_ closure: (PostgresConnection) async throws -> sending Result
) async throws -> sending Result {
let connection = try await self.leaseConnection()

defer { self.pool.releaseConnection(connection) }

return try await closure(connection)
}

/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
///
/// The function leases a connection from the underlying connection pool and starts a transaction by running a `BEGIN`
/// query on the leased connection against the database. It then lends the connection to the user provided closure.
/// The user can then modify the database as they wish. If the user provided closure returns successfully, the function
/// will attempt to commit the changes by running a `COMMIT` query against the database. If the user provided closure
/// throws an error, the function will attempt to rollback the changes made within the closure.
///
/// - Parameters:
/// - logger: The `Logger` to log into for the transaction.
/// - file: The file, the transaction was started in. Used for better error reporting.
/// - line: The line, the transaction was started in. Used for better error reporting.
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
/// The connection must stay in the transaction mode. Otherwise this method will throw!
/// - Returns: The closure's return value.
public func withTransaction<Result>(
logger: Logger,
file: String = #file,
line: Int = #line,
isolation: isolated (any Actor)? = #isolation,
_ closure: (PostgresConnection) async throws -> sending Result
) async throws -> sending Result {
try await self.withConnection { connection in
try await connection.withTransaction(logger: logger, file: file, line: line, closure)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Tests/IntegrationTests/PostgresClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class PostgresClientTests: XCTestCase {

for _ in 0..<iterations {
taskGroup.addTask {
let _ = try await client.withTransaction { transaction in
let _ = try await client.withTransaction(logger: logger) { transaction in
try await transaction.query(
"""
INSERT INTO "\(unescaped: tableName)" (uuid) VALUES (\(UUID()));
Expand All @@ -101,7 +101,7 @@ final class PostgresClientTests: XCTestCase {
taskGroup.addTask {

do {
let _ = try await client.withTransaction { transaction in
let _ = try await client.withTransaction(logger: logger) { transaction in
/// insert valid data
try await transaction.query(
"""
Expand Down
Loading