Skip to content

Commit

Permalink
Merge pull request #77 from purkhusid/multi-table-transact
Browse files Browse the repository at this point in the history
Add multi table transactions
  • Loading branch information
samritchie authored Oct 16, 2024
2 parents b5bcf15 + 1fdc601 commit f31ff7b
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 154 deletions.
25 changes: 11 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,9 @@ table.Scan(startedBefore (DateTimeOffset.Now - TimeSpan.FromDays 1.))

(See [`Script.fsx`](src/FSharp.AWS.DynamoDB/Script.fsx) for example timings showing the relative efficiency.)

## `TransactWriteItems`
## `Transaction`

Using [`TransactWriteItems`](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html)
to compose multiple write operations into an aggregate request that will succeed or fail atomically is supported.
See [overview article](https://www.alexdebrie.com/posts/dynamodb-transactions) by [@alexdebrie](https://github.com/alexdebrie)

NOTE: while the underlying API supports combining operations on multiple tables, the exposed API does not.
`FSharp.AWS.DynamoDB` supports DynamoDB transactions via the `Transaction` class.

The supported individual operations are:
- `Check`: `ConditionCheck` - potentially veto the batch if the ([precompiled](#Precomputing-DynamoDB-Expressions)) `condition` is not fulfilled by the item identified by `key`
Expand All @@ -271,21 +267,22 @@ The supported individual operations are:
let compile = table.Template.PrecomputeConditionalExpr
let doesntExistCondition = compile <@ fun t -> NOT_EXISTS t.Value @>
let existsCondition = compile <@ fun t -> EXISTS t.Value @>
let key = TableKey.Combined(hashKey, rangeKey)
let requests = [
TransactWrite.Check (key, doesntExistCondition)
TransactWrite.Put (item2, None)
TransactWrite.Put (item3, Some existsCondition)
TransactWrite.Delete (table.Template.ExtractKey item5, None) ]
do! table.TransactWriteItems requests
let transaction = Transaction()
transaction.Check(table, key, doesntExistCondition)
transaction.Put(table, item2, None)
transaction.Put(table, item3, Some existsCondition)
transaction.Delete (table ,table.Template.ExtractKey item5, None)
do! transaction.TransactWriteItems()
```

Failed preconditions (or `TransactWrite.Check`s) are signalled as per the underlying API: via a `TransactionCanceledException`.
Use `TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed` to trap such conditions:

```fsharp
try do! table.TransactWriteItems writes
try do! transaction.TransactWriteItems()
return Some result
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> return None
```
Expand Down
232 changes: 147 additions & 85 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -230,65 +230,6 @@ type private LimitType =
static member AllOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue All
static member DefaultOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue Default

/// <summary>Represents an individual request that can be included in the <c>TransactItems</c> of a <c>TransactWriteItems</c> call.</summary>
[<RequireQualifiedAccess>]
type TransactWrite<'TRecord> =
/// Specify a Check to be run on a specified item.
/// If the condition does not hold, the overall TransactWriteItems request will be Canceled.
| Check of key: TableKey * condition: ConditionExpression<'TRecord>
/// Specify a PutItem operation to be performed, inserting or replacing an item in the Table.
/// If the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
| Put of item: 'TRecord * precondition: ConditionExpression<'TRecord> option
/// Specify an UpdateItem operation to be performed, applying an updater expression on the item identified by the specified `key`, if it exists.
/// If the item exists and the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
| Update of key: TableKey * precondition: ConditionExpression<'TRecord> option * updater: UpdateExpression<'TRecord>
/// Specify a DeleteItem operation to be performed, removing the item identified by the specified `key` if it exists.
/// If the item exists and the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
| Delete of key: TableKey * precondition: ConditionExpression<'TRecord> option

/// Helpers for building a <c>TransactWriteItemsRequest</c> to supply to <c>TransactWriteItems</c>
module TransactWriteItemsRequest =

let private toTransactWriteItem<'TRecord>
tableName
(template: RecordTemplate<'TRecord>)
: TransactWrite<'TRecord> -> TransactWriteItem =
function
| TransactWrite.Check(key, cond) ->
let req = ConditionCheck(TableName = tableName, Key = template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer
TransactWriteItem(ConditionCheck = req)
| TransactWrite.Put(item, maybeCond) ->
let req = Put(TableName = tableName, Item = template.ToAttributeValues item)
maybeCond
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Put = req)
| TransactWrite.Update(key, maybeCond, updater) ->
let req = Update(TableName = tableName, Key = template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
maybeCond |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Update = req)
| TransactWrite.Delete(key, maybeCond) ->
let req = Delete(TableName = tableName, Key = template.ToAttributeValues key)
maybeCond
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Delete = req)
let internal toTransactItems<'TRecord> tableName template items =
Seq.map (toTransactWriteItem<'TRecord> tableName template) items |> rlist

/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
function
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
| _ -> None

/// Helpers for identifying Failed Precondition check outcomes emanating from <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c>
module Precondition =
/// <summary>Exception filter to identify whether an individual (non-transactional) <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c> call's <c>precondition</c> check failing.</summary>
Expand All @@ -297,6 +238,7 @@ module Precondition =
| :? ConditionalCheckFailedException -> Some()
| _ -> None


/// DynamoDB client object for performing table operations in the context of given F# record representations
[<Sealed; AutoSerializable(false)>]
type TableContext<'TRecord>
Expand Down Expand Up @@ -549,7 +491,6 @@ type TableContext<'TRecord>
/// Record-induced table template
member _.Template = template


/// <summary>
/// Creates a DynamoDB client instance for given F# record and table name.<br/>
/// For creating, provisioning or verification, see <c>VerifyOrCreateTableAsync</c> and <c>VerifyTableAsync</c>.
Expand Down Expand Up @@ -900,31 +841,6 @@ type TableContext<'TRecord>
return unprocessed |> Array.map template.ExtractKey
}


/// <summary>
/// Atomically applies a set of 1-100 write operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="items">Operations to be performed.<br/>
/// Throws <c>ArgumentOutOfRangeException</c> if item count is not between 1 and 100 as required by underlying API.<br/>
/// Use <c>TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed</c> to identify any Precondition Check failures.</param>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(items: seq<TransactWrite<'TRecord>>, ?clientRequestToken) : Async<unit> = async {
let reqs = TransactWriteItemsRequest.toTransactItems tableName template items
if reqs.Count = 0 || reqs.Count > 100 then
raise <| System.ArgumentOutOfRangeException(nameof items, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = reqs)
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r -> r TransactWriteItems (Seq.toList response.ConsumedCapacity) reqs.Count)
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}


/// <summary>
/// Asynchronously queries table with given condition expressions.
/// </summary>
Expand Down Expand Up @@ -1402,6 +1318,141 @@ type TableContext<'TRecord>
else
t.VerifyTableAsync()

member t.Transaction() =
match metricsCollector with
| Some metricsCollector -> Transaction(metricsCollector = metricsCollector)
| None -> Transaction()

/// <summary>
/// Represents a transactional set of operations to be applied atomically to a arbitrary number of DynamoDB tables.
/// </summary>
/// <param name="metricsCollector">Function to receive request metrics.</param>
and Transaction(?metricsCollector: (RequestMetrics -> unit)) =
let transactionItems = ResizeArray<TransactWriteItem>()
let mutable (dynamoDbClient: IAmazonDynamoDB) = null

let setClient client =
if dynamoDbClient = null then
dynamoDbClient <- client

let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) =
collector
{ TableName = tableName
Operation = operation
ConsumedCapacity = consumedCapacity
ItemCount = itemCount }

let returnConsumedCapacity, maybeReport =
match metricsCollector with
| Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink)
| None -> ReturnConsumedCapacity.NONE, None

/// <summary>
/// Adds a Put operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="item">Item to be put.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Put<'TRecord>
(
tableContext: TableContext<'TRecord>,
item: 'TRecord,
?precondition: ConditionExpression<'TRecord>
) : Transaction =
setClient tableContext.Client
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Put = req))
this

/// <summary>
/// Adds a ConditionCheck operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to check.</param>
/// <param name="condition">Condition to check.</param>
member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : Transaction =
setClient tableContext.Client

let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- condition.Conditional.Write writer
transactionItems.Add(TransactWriteItem(ConditionCheck = req))
this

/// <summary>
/// Adds an Update operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to update.</param>
/// <param name="updater">Update expression.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Update
(
tableContext: TableContext<'TRecord>,
key: TableKey,
updater: UpdateExpression<'TRecord>,
?precondition: ConditionExpression<'TRecord>

) : Transaction =
setClient tableContext.Client

let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Update = req))
this

/// <summary>
/// Adds a Delete operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to delete.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Delete
(
tableContext: TableContext<'TRecord>,
key: TableKey,
precondition: option<ConditionExpression<'TRecord>>
) : Transaction =
setClient tableContext.Client

let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Delete = req))
this

/// <summary>
/// Atomically applies a set of 1-100 operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(?clientRequestToken) : Async<unit> = async {
if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then
raise
<| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems))
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r ->
response.ConsumedCapacity
|> Seq.groupBy (fun x -> x.TableName)
|> Seq.iter (fun (tableName, consumedCapacity) ->
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}

// Deprecated factory method, to be removed. Replaced with
// 1. TableContext<'T> ctor (synchronous)
// 2. VerifyOrCreateTableAsync OR VerifyTableAsync (explicitly async to signify that verification/creation is a costly and/or privileged operation)
Expand Down Expand Up @@ -1489,6 +1540,8 @@ type TableContext internal () =
)
|> Async.RunSynchronously



/// <summary>
/// Sync-over-Async helpers that can be opted-into when working in scripting scenarios.
/// For normal usage, the <c>Async</c> versions of any given API is recommended, in order to ensure one
Expand Down Expand Up @@ -2096,3 +2149,12 @@ module Scripting =
member t.UpdateProvisionedThroughput(provisionedThroughput: ProvisionedThroughput) : unit =
let spec = Throughput.Provisioned provisionedThroughput
t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously

/// Helpers for working with <c>TransactWriteItemsRequest</c>
module TransactWriteItemsRequest =
/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
function
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
| _ -> None
17 changes: 9 additions & 8 deletions tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ type Tests(fixture: TableFixture) =
collector.Clear()

let item = mkItem (guid ()) (guid ()) 0
let requests = [ TransactWrite.Put(item, Some(compile <@ fun t -> NOT_EXISTS t.RangeKey @>)) ]

do! sut.TransactWriteItems requests
do!
Transaction(collector.Collect)
.Put(sut, item, compile <@ fun t -> NOT_EXISTS t.RangeKey @>)
.TransactWriteItems()

test
<@
Expand All @@ -130,13 +131,13 @@ type Tests(fixture: TableFixture) =
let sut = rawTable.WithMetricsCollector(collector.Collect)

let item = mkItem (guid ()) (guid ()) 0

// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
let requests = [ TransactWrite.Put(item, Some(compile <@ fun t -> EXISTS t.RangeKey @>)) ]

let mutable failed = false
try
do! sut.TransactWriteItems requests
do!
// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
Transaction()
.Put(sut, item, compile <@ fun t -> EXISTS t.RangeKey @>)
.TransactWriteItems()
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed ->
failed <- true
true =! failed
Expand Down
Loading

0 comments on commit f31ff7b

Please sign in to comment.