Skip to content

Commit

Permalink
rewrite syncOneTrustAssessments to run in parallel and series
Browse files Browse the repository at this point in the history
  • Loading branch information
abrantesarthur committed Jan 24, 2025
1 parent 0ad390c commit 2cfb1c3
Showing 1 changed file with 140 additions and 121 deletions.
261 changes: 140 additions & 121 deletions src/oneTrust/helpers/syncOneTrustAssessments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from '../../logger';
import {
OneTrustAssessmentQuestion,
OneTrustAssessmentSection,
OneTrustEnrichedAssessment,
OneTrustGetRiskResponse,
OneTrustGetUserResponse,
} from '@transcend-io/privacy-types';
Expand Down Expand Up @@ -52,132 +53,150 @@ export const syncOneTrustAssessments = async ({
// a cache of OneTrust users so we avoid sending requests for users already fetched
const oneTrustCachedUsers: Record<string, OneTrustGetUserResponse> = {};

/**
* fetch details about each assessment in series and write to transcend or to disk
* (depending on the dryRun argument) right away to avoid running out of memory
*/
await mapSeries(assessments, async (assessment, index) => {
logger.info(
`[assessment ${index + 1} of ${assessments.length}]: fetching details...`,
);
const assessmentDetails = await getOneTrustAssessment({
oneTrust,
assessmentId: assessment.assessmentId,
});
// split all assessments in batches, so we can process some of steps in parallel
const BATCH_SIZE = 5;
const assessmentBatches = Array.from(
{
length: Math.ceil(assessments.length / BATCH_SIZE),
},
(_, i) => assessments.slice(i * BATCH_SIZE, (i + 1) * BATCH_SIZE),
);

// fetch assessment's creator information
const creatorId = assessmentDetails.createdBy.id;
let creator = oneTrustCachedUsers[creatorId];
if (!creator) {
logger.info(
`[assessment ${index + 1} of ${
assessments.length
}]: fetching creator...`,
);
creator = await getOneTrustUser({
oneTrust,
userId: creatorId,
});
oneTrustCachedUsers[creatorId] = creator;
}
// process each batch and sync the batch right away so it's garbage collected and we don't run out of memory
await mapSeries(assessmentBatches, async (assessmentBatch, batch) => {
const batchEnrichedAssessments: OneTrustEnrichedAssessment[] = [];

// fetch assessment approvers information
const { approvers } = assessmentDetails;
let approversDetails: OneTrustGetUserResponse[] = [];
if (approvers.length > 0) {
logger.info(
`[assessment ${index + 1} of ${
assessments.length
}]: fetching approvers...`,
);
approversDetails = await map(
approvers.map(({ id }) => id),
async (userId) => {
let approver = oneTrustCachedUsers[userId];
if (!approver) {
approver = await getOneTrustUser({ oneTrust, userId });
oneTrustCachedUsers[userId] = approver;
}
return approver;
},
{ concurrency: 5 },
);
}
// fetch assessment details from OneTrust in parallel
await map(
assessmentBatch,
async (assessment, index) => {
const assessmentNumber = BATCH_SIZE * batch + index;
logger.info(
`[assessment ${assessmentNumber} of ${assessments.length}]: fetching details...`,
);
const assessmentDetails = await getOneTrustAssessment({
oneTrust,
assessmentId: assessment.assessmentId,
});
// fetch assessment's creator information
const creatorId = assessmentDetails.createdBy.id;
let creator = oneTrustCachedUsers[creatorId];
if (!creator) {
logger.info(
`[assessment ${assessmentNumber} of ${assessments.length}]: fetching creator...`,
);
creator = await getOneTrustUser({
oneTrust,
userId: creatorId,
});
oneTrustCachedUsers[creatorId] = creator;
}

// fetch assessment internal respondents information
const { respondents } = assessmentDetails;
// internal respondents names are not emails.
const internalRespondents = respondents.filter(
(r) => !r.name.includes('@'),
);
let respondentsDetails: OneTrustGetUserResponse[] = [];
if (internalRespondents.length > 0) {
logger.info(
`[assessment ${index + 1} of ${
assessments.length
}]: fetching respondents...`,
);
respondentsDetails = await map(
internalRespondents.map(({ id }) => id),
async (userId) => {
let respondent = oneTrustCachedUsers[userId];
if (!respondent) {
respondent = await getOneTrustUser({ oneTrust, userId });
oneTrustCachedUsers[userId] = respondent;
}
return respondent;
},
{ concurrency: 5 },
);
}
// fetch assessment approvers information
const { approvers } = assessmentDetails;
let approversDetails: OneTrustGetUserResponse[] = [];
if (approvers.length > 0) {
logger.info(
`[assessment ${assessmentNumber} of ${assessments.length}]: fetching approvers...`,
);
approversDetails = await map(
approvers.map(({ id }) => id),
async (userId) => {
let approver = oneTrustCachedUsers[userId];
if (!approver) {
approver = await getOneTrustUser({ oneTrust, userId });
oneTrustCachedUsers[userId] = approver;
}
return approver;
},
{ concurrency: 5 },
);
}

// fetch assessment risk information
let riskDetails: OneTrustGetRiskResponse[] = [];
const riskIds = uniq(
assessmentDetails.sections.flatMap((s: OneTrustAssessmentSection) =>
s.questions.flatMap((q: OneTrustAssessmentQuestion) =>
(q.risks ?? []).flatMap((r) => r.riskId),
),
),
);
if (riskIds.length > 0) {
logger.info(
`[assessment ${index + 1} of ${assessments.length}]: fetching risks...`,
);
riskDetails = await map(
riskIds,
(riskId) => getOneTrustRisk({ oneTrust, riskId: riskId as string }),
{
concurrency: 5,
},
);
}
// fetch assessment internal respondents information
const { respondents } = assessmentDetails;
// internal respondents names are not emails.
const internalRespondents = respondents.filter(
(r) => !r.name.includes('@'),
);
let respondentsDetails: OneTrustGetUserResponse[] = [];
if (internalRespondents.length > 0) {
logger.info(
`[assessment ${assessmentNumber} of ${assessments.length}]: fetching respondents...`,
);
respondentsDetails = await map(
internalRespondents.map(({ id }) => id),
async (userId) => {
let respondent = oneTrustCachedUsers[userId];
if (!respondent) {
respondent = await getOneTrustUser({ oneTrust, userId });
oneTrustCachedUsers[userId] = respondent;
}
return respondent;
},
{ concurrency: 5 },
);
}

// fetch assessment risk information
let riskDetails: OneTrustGetRiskResponse[] = [];
const riskIds = uniq(
assessmentDetails.sections.flatMap((s: OneTrustAssessmentSection) =>
s.questions.flatMap((q: OneTrustAssessmentQuestion) =>
(q.risks ?? []).flatMap((r) => r.riskId),
),
),
);
if (riskIds.length > 0) {
logger.info(
`[assessment ${assessmentNumber} of ${assessments.length}]: fetching risks...`,
);
riskDetails = await map(
riskIds,
(riskId) => getOneTrustRisk({ oneTrust, riskId: riskId as string }),
{
concurrency: 5,
},
);
}

// enrich the assessments with user and risk details
const enrichedAssessment = enrichOneTrustAssessment({
assessment,
assessmentDetails,
riskDetails,
creatorDetails: creator,
approversDetails,
respondentsDetails,
});
// enrich the assessments with user and risk details
const enrichedAssessment = enrichOneTrustAssessment({
assessment,
assessmentDetails,
riskDetails,
creatorDetails: creator,
approversDetails,
respondentsDetails,
});

if (dryRun && file && fileFormat) {
// sync to file
syncOneTrustAssessmentToDisk({
assessment: enrichedAssessment,
index,
total: assessments.length,
file,
fileFormat,
});
} else if (fileFormat === OneTrustFileFormat.Csv && transcend) {
// sync to transcend
await syncOneTrustAssessmentToTranscend({
assessment: enrichedAssessment,
transcend,
});
}
batchEnrichedAssessments.push(enrichedAssessment);
},
{ concurrency: BATCH_SIZE },
);

// sync assessments in series to avoid concurrency bugs
await mapSeries(
batchEnrichedAssessments,
async (enrichedAssessment, index) => {
const trueIndex = batch * BATCH_SIZE + index;
if (dryRun && file && fileFormat) {
// sync to file
syncOneTrustAssessmentToDisk({
assessment: enrichedAssessment,
index: trueIndex,
total: assessments.length,
file,
fileFormat,
});
} else if (fileFormat === OneTrustFileFormat.Csv && transcend) {
// sync to transcend
await syncOneTrustAssessmentToTranscend({
assessment: enrichedAssessment,
transcend,
});
}
},
);
});
};

0 comments on commit 2cfb1c3

Please sign in to comment.