From c7b9bb4edf0084e2a338609dcf0b364ba3c13935 Mon Sep 17 00:00:00 2001 From: Jacob Cable Date: Tue, 4 Mar 2025 15:44:50 +0000 Subject: [PATCH] refactor(gen-schema-view): simplify genkit flow --- .../gen-schema-view/src/config/index.ts | 4 +- .../src/config/non-interactive.ts | 2 +- .../scripts/gen-schema-view/src/index.ts | 116 +------- .../gen-schema-view/src/schema/genkit.ts | 248 +++++++++--------- 4 files changed, 129 insertions(+), 241 deletions(-) diff --git a/firestore-bigquery-export/scripts/gen-schema-view/src/config/index.ts b/firestore-bigquery-export/scripts/gen-schema-view/src/config/index.ts index f7c61898d..9b5893b65 100644 --- a/firestore-bigquery-export/scripts/gen-schema-view/src/config/index.ts +++ b/firestore-bigquery-export/scripts/gen-schema-view/src/config/index.ts @@ -5,7 +5,9 @@ import { parseProgram, validateNonInteractiveParams } from "./non-interactive"; const DEFAULT_SAMPLE_SIZE = 100; -interface CliConfig { +// TODO: if you dont pass in a schema file (e.g use gemini to create one, the script fails) + +export interface CliConfig { projectId: string; bigQueryProjectId: string; datasetId: string; diff --git a/firestore-bigquery-export/scripts/gen-schema-view/src/config/non-interactive.ts b/firestore-bigquery-export/scripts/gen-schema-view/src/config/non-interactive.ts index 0b779038d..ab35af9a8 100644 --- a/firestore-bigquery-export/scripts/gen-schema-view/src/config/non-interactive.ts +++ b/firestore-bigquery-export/scripts/gen-schema-view/src/config/non-interactive.ts @@ -1,4 +1,4 @@ -import * as program from "commander"; +import program from "commander"; /** * Helper function to collect multiple values for an option into an array diff --git a/firestore-bigquery-export/scripts/gen-schema-view/src/index.ts b/firestore-bigquery-export/scripts/gen-schema-view/src/index.ts index 5ac475afd..e0520d407 100644 --- a/firestore-bigquery-export/scripts/gen-schema-view/src/index.ts +++ b/firestore-bigquery-export/scripts/gen-schema-view/src/index.ts @@ -17,71 +17,10 @@ */ import firebase = require("firebase-admin"); -import inquirer from "inquirer"; import { FirestoreBigQuerySchemaViewFactory, FirestoreSchema } from "./schema"; import { readSchemas } from "./schema-loader-utils"; -import { runAgent } from "./schema/genkit"; import { parseConfig } from "./config"; - -export async function sampleFirestoreDocuments( - collectionPath: string, - sampleSize: number -): Promise { - const db = firebase.firestore(); - - try { - const snapshot = await db - .collection(collectionPath) - .where("__name__", ">=", Math.random().toString()) - .limit(sampleSize) - .get(); - - const documents = snapshot.docs.map((doc) => { - const data = doc.data(); - return serializeDocument(data); - }); - - console.log(`Successfully sampled ${documents.length} documents.`); - return documents; - } catch (error) { - console.error("Error sampling documents:", error); - throw error; - } -} - -function serializeDocument(data: any): any { - if (!data) return null; - - if (data instanceof Date) { - return { _type: "timestamp", value: data.toISOString() }; - } - - if (data instanceof firebase.firestore.GeoPoint) { - return { - _type: "geopoint", - latitude: data.latitude, - longitude: data.longitude, - }; - } - - if (data instanceof firebase.firestore.DocumentReference) { - return { _type: "reference", path: data.path }; - } - - if (Array.isArray(data)) { - return data.map((item) => serializeDocument(item)); - } - - if (typeof data === "object") { - const result = {}; - for (const [key, value] of Object.entries(data)) { - result[key] = serializeDocument(value); - } - return result; - } - - return data; -} +import { generateSchemaFilesWithGemini } from "./schema/genkit"; async function run(): Promise { const config = await parseConfig(); @@ -101,58 +40,11 @@ async function run(): Promise { ); if (config.useGemini) { - // TODO: move to genkit subdirectory try { - const sampleData = await sampleFirestoreDocuments( - config.collectionPath!, - config.agentSampleSize! - ); - const chat = runAgent( - config.googleAiKey!, - // TODO: set this somehow from user input - "./schemas", - config.tableNamePrefix, - config.collectionPath!, - sampleData - ); - await chat.send( - `Please analyze these documents and generate an appropriate BigQuery schema. ` + - `**Then use the writeSchema tool to save it as "${config.tableNamePrefix}.json**". ` + - `Let me know once you've created the schema file.` - ); - const schemaName = `${config.tableNamePrefix}`; - const schemas = readSchemas([`./schemas/${schemaName}.json`]); - - if (!schemas[schemaName]) { - console.error( - `Error reading schema file: ./schemas/${schemaName}.json. Gemini may have failed to generate the schema. - If the issue persists, please manually create the schema file and run the script again.` - ); - process.exit(1); - } - - const schemaPath = `./schemas/${config.tableNamePrefix}.json`; - console.log( - `\nSchema generation complete. The schema file has been created at: ${schemaPath}. Please review the schema file and confirm if you want to proceed.` - ); - - const confirmation = await inquirer.prompt([ - { - type: "confirm", - name: "proceed", - message: - "Have you reviewed the schema and want to proceed with creating the views?", - default: false, - }, - ]); - - if (!confirmation.proceed) { - console.log( - "Operation cancelled. Please modify the schema file and run the script again." - ); - process.exit(0); - } + await generateSchemaFilesWithGemini(config); + const schemas = readSchemas([`./schemas/${config.tableNamePrefix}.json`]); + // TODO: move this out of the block so we're not repeating ourselves. for (const name in schemas) { await viewFactory.initializeSchemaViewResources( config.datasetId, diff --git a/firestore-bigquery-export/scripts/gen-schema-view/src/schema/genkit.ts b/firestore-bigquery-export/scripts/gen-schema-view/src/schema/genkit.ts index 7079480c8..9a55fd093 100644 --- a/firestore-bigquery-export/scripts/gen-schema-view/src/schema/genkit.ts +++ b/firestore-bigquery-export/scripts/gen-schema-view/src/schema/genkit.ts @@ -1,34 +1,79 @@ -import { gemini20Flash, googleAI } from "@genkit-ai/googleai"; -import { Genkit, genkit, z } from "genkit"; +import type { CliConfig } from "../config"; +import firebase = require("firebase-admin"); +import { genkit } from "genkit"; +import { googleAI, gemini20Flash } from "@genkit-ai/googleai"; import * as fs from "fs/promises"; import * as path from "path"; -import { SchemaSchema } from "./genkitSchema"; // Assuming the schema is in a separate file +import inquirer from "inquirer"; -/** - * Initializes Genkit with the Google AI plugin. - * - * @param {string} apiKey - The API key for Google AI. - * @returns {ReturnType} - An instance of Genkit configured with the Google AI plugin. - */ -const initializeGenkit = (apiKey: string) => { - return genkit({ plugins: [googleAI({ apiKey })] }); -}; +const ai = genkit({ + plugins: [ + googleAI({ + // TODO: we need to pass in the api key + // apiKey: config.googleAiKey, + }), + ], +}); + +export async function sampleFirestoreDocuments( + collectionPath: string, + sampleSize: number +): Promise { + const db = firebase.firestore(); -/** - * Validates the content of a schema against the SchemaSchema. - * - * @param {string} content - The JSON string representation of the schema to validate. - * @throws {Error} - Throws an error if the schema is invalid. - * @returns {boolean} - Returns true if the schema is valid. - */ -const validateSchemaContent = (content: string) => { try { - SchemaSchema.parse(JSON.parse(content)); - return true; + const snapshot = await db + .collection(collectionPath) + .where("__name__", ">=", Math.random().toString()) + .limit(sampleSize) + .get(); + + const documents = snapshot.docs.map((doc) => { + const data = doc.data(); + return serializeDocument(data); + }); + + console.log(`Successfully sampled ${documents.length} documents.`); + return documents; } catch (error) { - throw new Error(`Invalid schema content: ${error.message}`); + console.error("Error sampling documents:", error); + throw error; + } +} + +function serializeDocument(data: any): any { + if (!data) return null; + + if (data instanceof Date) { + return { _type: "timestamp", value: data.toISOString() }; } -}; + + if (data instanceof firebase.firestore.GeoPoint) { + return { + _type: "geopoint", + latitude: data.latitude, + longitude: data.longitude, + }; + } + + if (data instanceof firebase.firestore.DocumentReference) { + return { _type: "reference", path: data.path }; + } + + if (Array.isArray(data)) { + return data.map((item) => serializeDocument(item)); + } + + if (typeof data === "object") { + const result = {}; + for (const [key, value] of Object.entries(data)) { + result[key] = serializeDocument(value); + } + return result; + } + + return data; +} /** * Writes a schema file to the specified directory if it does not already exist. @@ -53,76 +98,19 @@ const writeSchemaFile = async ( } }; -/** - * Defines the writeSchema tool for the Genkit agent. - * - * @param {ReturnType} ai - The Genkit instance. - * @param {string} schemaDirectory - The directory where schema files are stored. - * @returns {object} - The defined tool instance. - */ -const defineWriteSchemaTool = ( - ai: ReturnType, - schemaDirectory: string -) => { - return ai.defineTool( - { - name: "writeSchema", - description: "Creates a new schema file", - inputSchema: z.object({ - fileName: z.string().describe("Name of the schema file to create"), - content: z.string().describe("JSON content of the schema"), - }), - outputSchema: z.string().describe("Result of the operation"), - }, - async ({ - fileName, - content, - }: { - fileName: string; - content: string; - }): Promise => { - try { - validateSchemaContent(content); - return await writeSchemaFile(schemaDirectory, fileName, content); - } catch (error) { - return `Error creating schema: ${error.message}`; - } - } - ); -}; - -/** - * Defines the schema management agent for Genkit. - * - * @param {ReturnType} ai - The Genkit instance. - * @param {string} schemaDirectory - The directory where schema files are stored. - * @param {string} collectionName - The name of the Firestore collection. - * @param {string} tablePrefix - The prefix for the generated BigQuery table schema. - * @param {any[]} sampleData - Sample documents from the Firestore collection. - * @returns {object} - The defined prompt instance. - */ -const defineSchemaAgent = ( - ai: Genkit, - schemaDirectory: string, - collectionName: string, - tablePrefix: string, - sampleData: any[] -): object => { - const writeSchemaTool = defineWriteSchemaTool(ai, schemaDirectory); - - return ai.definePrompt( - { - name: "schemaAgent", - description: "Agent for managing BigQuery schema files", - tools: [writeSchemaTool], - model: gemini20Flash, - }, - ` +const biqquerySchemaPrompt = ({ + collectionName, + sampleData, + tablePrefix, +}: { + collectionName: string; + sampleData: any[]; + tablePrefix: string; +}) => ` You are a Schema Management Agent for Generating BigQuery schemas from Firestore Collections. Your primary tasks are: 1. Analyze the provided sample documents 2. Generate an appropriate BigQuery schema - 3. Save the schema using the writeSchema tool I will provide you with sample documents from the collection "${collectionName}". @@ -188,55 +176,61 @@ const defineSchemaAgent = ( IMPORTANT: After analyzing the sample data: 1. Generate a schema with detailed descriptions for ALL fields - 2. Use the writeSchema tool to save it as "${tablePrefix}.json" - 3. Confirm the schema was successfully saved - 4. Make sure all fields are correctly represented in the schema, and described and formatted - 5. SQL has a number of reserved keywords that can cause conflicts when creating a schema, timestamp is one such example. + 2. Make sure all fields are correctly represented in the schema, and described and formatted + 3. SQL has a number of reserved keywords that can cause conflicts when creating a schema, timestamp is one such example. To ensure your Firestore document field names do not conflict, use the column_name option to override the field name. for example: { "fields": [ { - "name": "name", - "type": "string" + "name": "name", + "type": "string" }, { - "name": "age", - "type": "number", - "column_name": "new_column_name" + "name": "age", + "type": "number", + "column_name": "new_column_name" } ] } - Begin by analyzing the sample data and create a well-documented schema.` - ); -}; + Begin by analyzing the sample data and then create a well-documented schema.`; -/** - * Main function to run the Genkit agent for schema management. - * - * @param {string} apiKey - The API key for Google AI. - * @param {string} schemaDirectory - The directory where schema files are stored. - * @param {string} collectionName - The name of the Firestore collection. - * @param {string} tablePrefix - The prefix for the generated BigQuery table schema. - * @param {any[]} sampleData - Sample documents from the Firestore collection. - * @returns {Promise} - The chat interface with the schema management agent. - */ -export const runAgent = ( - apiKey: string, - schemaDirectory: string, - collectionName: string, - tablePrefix: string, - sampleData: any[] -) => { - const ai = initializeGenkit(apiKey); - const schemaAgent = defineSchemaAgent( - ai, - schemaDirectory, - collectionName, - tablePrefix, - sampleData +export const generateSchemaFilesWithGemini = async (config: CliConfig) => { + // get sample data from Firestore + const sampleData = await sampleFirestoreDocuments( + config.collectionPath!, + config.agentSampleSize! ); - return ai.chat(schemaAgent); + const prompt = biqquerySchemaPrompt({ + collectionName: config.collectionPath!, + sampleData, + tablePrefix: config.tableNamePrefix, + }); + + // prompt gemini with sample data to generate a schema file + const { text } = await ai.generate({ + model: gemini20Flash, + prompt, + }); + + await writeSchemaFile("./schemas", `${config.tableNamePrefix}.json`, text); + // confirm with user that schema file is correct + const confirmation = await inquirer.prompt([ + { + type: "confirm", + name: "proceed", + message: + "Have you reviewed the schema and want to proceed with creating the views?", + default: false, + }, + ]); + + if (!confirmation.proceed) { + console.log( + "Operation cancelled. Please modify the schema file and run the script again." + ); + process.exit(0); + } };