Skip to content

Commit 21bedfd

Browse files
committed
Initial commit
1 parent cf2abf6 commit 21bedfd

File tree

7 files changed

+237
-1
lines changed

7 files changed

+237
-1
lines changed

README.md

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,24 @@
1-
# change-table-schemas
1+
# DynamoDB: Change Table Schemas
22
A tool to help migrate tables in DynamoDB
3+
4+
## Steps to follow
5+
6+
7+
1. Create a new table (let us call this NewTable), with the desired key structure, LSIs, GSIs.
8+
2. Enable DynamoDB Streams on the original table
9+
3. Associate a Lambda to the Stream, which pushes the record into NewTable. (This Lambda should trim off the migration flag in Step 5)
10+
4. [*Optional*] Create a GSI on the original table to speed up scanning items. Ensure this GSI only has attributes: Primary Key, and Migrated (See Step 5).
11+
5. Scan the GSI created in the previous step (or entire table) and use the following Filter:
12+
13+
FilterExpression = "attribute_not_exists(Migrated)"
14+
15+
Update each item in the table with a migrate flag (ie: “Migrated”: { “S”: “0” }, which sends it to the DynamoDB Streams (using UpdateItem API, to ensure no data loss occurs).
16+
17+
**NOTE** You may want to increase write capacity units on the table during the updates.
18+
6. The Lambda will pick up all items, trim off the Migrated flag and push it into NewTable.
19+
7. Once all items have been migrated, repoint the code to the new table
20+
8. Remove original table, and Lambda function once happy all is good.
21+
22+
## More information
23+
24+
https://www.abhayachauhan.com/2017/10/dynamodb-changing-table-schema/
+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
var async = require('async');
2+
3+
exports.flagForMigration = (dynamodb, tableName, migrationAttributeName, itemKeys) => {
4+
return new Promise((resolve, reject) => {
5+
function shuffle(a) {
6+
var j, x, i;
7+
for (i = a.length - 1; i > 0; i--) {
8+
j = Math.floor(Math.random() * (i + 1));
9+
x = a[i];
10+
a[i] = a[j];
11+
a[j] = x;
12+
}
13+
}
14+
15+
function updateItem(item, cb) {
16+
delete item['migrationFlag'];
17+
console.log(item);
18+
var params = {
19+
"Key": item,
20+
"TableName": tableName,
21+
"ExpressionAttributeNames": {
22+
"#MF": migrationAttributeName
23+
},
24+
"ExpressionAttributeValues": {
25+
":t": { BOOL: true }
26+
},
27+
"UpdateExpression": "SET #MF = :t",
28+
"ReturnConsumedCapacity": "INDEXES"
29+
}
30+
31+
dynamodb.updateItem(params, function(err, data) {
32+
if (err)
33+
console.log('Error', params);
34+
if (!err)
35+
itemKeys.splice(itemKeys.indexOf(item));
36+
cb(err, data);
37+
});
38+
}
39+
40+
// Shuffle keys so we minimise hot partition keys
41+
shuffle(itemKeys);
42+
43+
async.eachLimit(itemKeys, 2, updateItem, function(err) {
44+
if (err) {
45+
reject(err);
46+
}
47+
resolve();
48+
});
49+
});
50+
}
51+

flag-for-migration/index.js

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
var AWS = require('aws-sdk');
2+
var dynamodb = new AWS.DynamoDB();
3+
var async = require('async');
4+
var scan = require('./scanTable.js');
5+
var migrate = require('./flagForMigration.js');
6+
7+
var tableName = "XXX"; // TABLE NAME HERE
8+
var gsiName = "XXX"; // GSI NAME HERE
9+
var migrationAttributeName = "migrationFlag"; // MIGRATION ATTRIBUTE HERE
10+
11+
startWork();
12+
13+
function startWork() {
14+
scan.scanTable(dynamodb, tableName, gsiName, migrationAttributeName)
15+
.then(itemKeys => {
16+
console.log('Finished scanning, found', itemKeys.length,'items.');
17+
18+
migrate.flagForMigration(dynamodb, tableName, migrationAttributeName, itemKeys);
19+
});
20+
}

flag-for-migration/scanTable.js

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
exports.scanTable = (dynamodb, tableName, gsiIndexName, migrationAttributeName) => {
2+
var items = [];
3+
4+
return new Promise((resolve, reject) => {
5+
6+
function iterateTable(params) {
7+
var promise = dynamodb.scan(params).promise();
8+
9+
promise
10+
.then(data => {
11+
console.log('Received',data.Items.length,'items');
12+
items = items.concat(data.Items);
13+
if (data.LastEvaluatedKey) {
14+
params.ExclusiveStartKey = data.LastEvaluatedKey;
15+
iterateTable(params);
16+
} else {
17+
resolve(items);
18+
}
19+
})
20+
.catch(err => {
21+
console.log('ERROR:');
22+
console.log(err);
23+
iterateTable(params);
24+
});
25+
}
26+
27+
var params = {
28+
"TableName": tableName,
29+
"ReturnConsumedCapacity": "INDEXES"
30+
}
31+
if (gsiIndexName)
32+
params.IndexName = gsiIndexName;
33+
if (migrationAttributeName)
34+
params.FilterExpression = "attribute_not_exists(" + migrationAttributeName + ")";
35+
36+
iterateTable(params);
37+
38+
});
39+
}

migration-lambda/index.js

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
var AWS = require('aws-sdk');
2+
var dynamodb = new AWS.DynamoDB();
3+
var newTableName = process.env.destination;
4+
5+
exports.handler = (event, context, callback) => {
6+
var promises = [];
7+
8+
event.Records.forEach(function(record) {
9+
if (record.eventName === "REMOVE") {
10+
var params = {
11+
Key: record.dynamodb.OldImage.Key,
12+
TableName: newTableName
13+
};
14+
15+
promises.push(
16+
dynamodb.deleteItem(params).promise()
17+
);
18+
19+
}
20+
else {
21+
var params = {
22+
Item: record.dynamodb.NewImage,
23+
TableName: newTableName
24+
};
25+
26+
promises.push(
27+
dynamodb.putItem(params).promise()
28+
);
29+
}
30+
31+
});
32+
33+
Promise.all(promises)
34+
.then(() => callback())
35+
.catch((err) => callback(err));
36+
};
+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
{
2+
"Version": "2012-10-17",
3+
"Statement": [
4+
{
5+
"Effect": "Allow",
6+
"Action": [
7+
"lambda:InvokeFunction"
8+
],
9+
"Resource": [
10+
"*"
11+
]
12+
},
13+
{
14+
"Effect": "Allow",
15+
"Action": [
16+
"logs:CreateLogGroup",
17+
"logs:CreateLogStream",
18+
"logs:PutLogEvents"
19+
],
20+
"Resource": "arn:aws:logs:*:*:*"
21+
},
22+
{
23+
"Effect": "Allow",
24+
"Action": [
25+
"dynamodb:PutItem",
26+
"dynamodb:DeleteItem"
27+
],
28+
"Resource": [
29+
"arn:aws:dynamodb:*:*:table/*"
30+
]
31+
},
32+
{
33+
"Effect": "Allow",
34+
"Action": [
35+
"dynamodb:DescribeStream",
36+
"dynamodb:GetRecords",
37+
"dynamodb:GetShardIterator",
38+
"dynamodb:ListStreams"
39+
],
40+
"Resource": [
41+
"arn:aws:dynamodb:*:*:table/*/stream/*"
42+
]
43+
}
44+
]
45+
}

package.json

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "change-table-schemas",
3+
"version": "1.0.0",
4+
"description": "A tool to help migrate tables in DynamoDB",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "git+https://github.com/abhayachauhan/change-table-schemas.git"
12+
},
13+
"author": "Abhaya Chauhan",
14+
"license": "MIT",
15+
"bugs": {
16+
"url": "https://github.com/abhayachauhan/change-table-schemas/issues"
17+
},
18+
"homepage": "https://github.com/abhayachauhan/change-table-schemas#readme",
19+
"dependencies": {
20+
"async": "^2.6.0",
21+
"aws-sdk": "^2.176.0"
22+
}
23+
}

0 commit comments

Comments
 (0)