Skip to content

Commit

Permalink
[duckdb] Fixed delete bug in DuckDB DVRT (#1474)
Browse files Browse the repository at this point in the history
Delete used to work only for key schemas that had a single string field
named "key". Now it works for any schema.

Miscellaneous:

- Refactored the InsertProcessor code to be more generic.

- Bumped DuckDB snapshot dep to the latest.
  • Loading branch information
FelixGV authored Jan 25, 2025
1 parent 47cb541 commit 5dfe33b
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 131 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ext.libraries = [
commonsLang: 'commons-lang:commons-lang:2.6',
conscrypt: 'org.conscrypt:conscrypt-openjdk-uber:2.5.2',
d2: "com.linkedin.pegasus:d2:${pegasusVersion}",
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250121.012246-127", // TODO: Remove SNAPSHOT when the real release is published!
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250124.011319-133", // TODO: Remove SNAPSHOT when the real release is published!
failsafe: 'net.jodah:failsafe:2.4.0',
fastUtil: 'it.unimi.dsi:fastutil:8.3.0',
grpcNettyShaded: "io.grpc:grpc-netty-shaded:${grpcVersion}",
Expand Down
1 change: 1 addition & 0 deletions integrations/venice-duckdb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
testImplementation project(':internal:venice-common')

integrationTestImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
integrationTestImplementation project(':clients:venice-producer')
integrationTestImplementation project(':clients:venice-push-job')
integrationTestImplementation project(':clients:venice-thin-client')
integrationTestImplementation project(':integrations:venice-duckdb')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformer;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.producer.online.OnlineProducerFactory;
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.PushInputSchemaBuilder;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -91,20 +95,24 @@ public void setUp() {
clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);
clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true);
clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3);
cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, false, false, clusterConfig);
d2Client = new D2ClientBuilder().setZkHosts(cluster.getZk().getAddress())
this.cluster = ServiceFactory.getVeniceCluster(
new VeniceClusterCreateOptions.Builder().numberOfControllers(1)
.numberOfServers(2)
.numberOfRouters(1)
.replicationFactor(2)
.extraProperties(clusterConfig)
.build());
this.d2Client = new D2ClientBuilder().setZkHosts(this.cluster.getZk().getAddress())
.setZkSessionTimeout(3, TimeUnit.SECONDS)
.setZkStartupTimeout(3, TimeUnit.SECONDS)
.build();
D2ClientUtils.startClient(d2Client);
D2ClientUtils.startClient(this.d2Client);
}

@AfterClass
public void cleanUp() {
if (d2Client != null) {
D2ClientUtils.shutdownClient(d2Client);
}
Utils.closeQuietlyWithErrorLogged(cluster);
D2ClientUtils.shutdownClient(this.d2Client);
Utils.closeQuietlyWithErrorLogged(this.cluster);
}

@BeforeMethod
Expand Down Expand Up @@ -158,24 +166,40 @@ public void testRecordTransformer() throws Exception {

clientWithRecordTransformer.subscribeAll().get();

assertRowCount(duckDBUrl, storeName, "subscribeAll() finishes!");
assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT, "subscribeAll() finishes!");

try (OnlineVeniceProducer producer = OnlineProducerFactory.createProducer(
ClientConfig.defaultGenericClientConfig(storeName)
.setD2Client(d2Client)
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME),
VeniceProperties.empty(),
null)) {
producer.asyncDelete(getKey(1)).get();
}

TestUtils.waitForNonDeterministicAssertion(
10,
TimeUnit.SECONDS,
true,
() -> assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT - 1, "deleting 1 row!"));

clientWithRecordTransformer.unsubscribeAll();
}

assertRowCount(duckDBUrl, storeName, "DVC gets closed!");
assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT - 1, "DVC gets closed!");
}

private void assertRowCount(String duckDBUrl, String storeName, String assertionErrorMsg) throws SQLException {
private void assertRowCount(String duckDBUrl, String storeName, int expectedCount, String assertionErrorMsg)
throws SQLException {
try (Connection connection = DriverManager.getConnection(duckDBUrl);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT count(*) FROM " + storeName)) {
assertTrue(rs.next());
int rowCount = rs.getInt(1);
assertEquals(
rowCount,
DEFAULT_USER_DATA_RECORD_COUNT,
"The DB should contain " + DEFAULT_USER_DATA_RECORD_COUNT + " right after " + assertionErrorMsg);
expectedCount,
"The DB should contain " + expectedCount + " rows right after " + assertionErrorMsg);
}
}

Expand All @@ -195,8 +219,7 @@ protected void setUpStore(
String lastName = "last_name_";
Schema valueSchema = writeSimpleAvroFile(inputDir, pushRecordSchema, i -> {
GenericRecord keyValueRecord = new GenericData.Record(pushRecordSchema);
GenericRecord key = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA);
key.put("key", i.toString());
GenericRecord key = getKey(i);
keyValueRecord.put(DEFAULT_KEY_FIELD_PROP, key);
GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
valueRecord.put("firstName", firstName + i);
Expand All @@ -214,7 +237,9 @@ protected void setUpStore(
final int numPartitions = 3;
UpdateStoreQueryParams params = new UpdateStoreQueryParams().setPartitionCount(numPartitions)
.setChunkingEnabled(chunkingEnabled)
.setCompressionStrategy(compressionStrategy);
.setCompressionStrategy(compressionStrategy)
.setHybridOffsetLagThreshold(10)
.setHybridRewindSeconds(1);

paramsConsumer.accept(params);

Expand All @@ -231,6 +256,12 @@ protected void setUpStore(
}
}

private GenericRecord getKey(Integer i) {
GenericRecord key = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA);
key.put("key", i.toString());
return key;
}

private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, VeniceClusterWrapper cluster) {
long vpjStart = System.currentTimeMillis();
String jobName = Utils.getUniqueString("batch-job-" + expectedVersionNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.sql.AvroToSQL;
import com.linkedin.venice.sql.InsertProcessor;
import com.linkedin.venice.sql.PreparedStatementProcessor;
import com.linkedin.venice.sql.SQLUtils;
import com.linkedin.venice.sql.TableDefinition;
import com.linkedin.venice.utils.concurrent.CloseableThreadLocal;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -28,7 +27,6 @@ public class DuckDBDaVinciRecordTransformer
extends DaVinciRecordTransformer<GenericRecord, GenericRecord, GenericRecord> {
private static final Logger LOGGER = LogManager.getLogger(DuckDBDaVinciRecordTransformer.class);
private static final String duckDBFilePath = "my_database.duckdb";
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
private static final String createViewStatementTemplate = "CREATE OR REPLACE VIEW %s AS SELECT * FROM %s;";
private static final String dropTableStatementTemplate = "DROP TABLE %s;";
private final String storeNameWithoutVersionInfo;
Expand All @@ -38,7 +36,8 @@ public class DuckDBDaVinciRecordTransformer
private final CloseableThreadLocal<Connection> connection;
private final CloseableThreadLocal<PreparedStatement> deletePreparedStatement;
private final CloseableThreadLocal<PreparedStatement> upsertPreparedStatement;
private final InsertProcessor upsertProcessor;
private final PreparedStatementProcessor upsertProcessor;
private final PreparedStatementProcessor deleteProcessor;

public DuckDBDaVinciRecordTransformer(
int storeVersion,
Expand All @@ -54,8 +53,7 @@ public DuckDBDaVinciRecordTransformer(
this.versionTableName = buildStoreNameWithVersion(storeVersion);
this.duckDBUrl = "jdbc:duckdb:" + baseDir + "/" + duckDBFilePath;
this.columnsToProject = columnsToProject;
String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key"); // TODO: Fix this, it is
// broken
String deleteStatement = AvroToSQL.deleteStatement(versionTableName, keySchema);
String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, inputValueSchema, columnsToProject);
this.connection = CloseableThreadLocal.withInitial(() -> {
try {
Expand All @@ -68,17 +66,18 @@ public DuckDBDaVinciRecordTransformer(
try {
return this.connection.get().prepareStatement(deleteStatement);
} catch (SQLException e) {
throw new VeniceException("Failed to create PreparedStatement!", e);
throw new VeniceException("Failed to create PreparedStatement for: " + deleteStatement, e);
}
});
this.upsertPreparedStatement = CloseableThreadLocal.withInitial(() -> {
try {
return this.connection.get().prepareStatement(upsertStatement);
} catch (SQLException e) {
throw new VeniceException("Failed to create PreparedStatement!", e);
throw new VeniceException("Failed to create PreparedStatement for: " + upsertStatement, e);
}
});
this.upsertProcessor = AvroToSQL.upsertProcessor(keySchema, inputValueSchema, columnsToProject);
this.deleteProcessor = AvroToSQL.deleteProcessor(keySchema);
}

@Override
Expand All @@ -95,14 +94,7 @@ public void processPut(Lazy<GenericRecord> key, Lazy<GenericRecord> value) {

@Override
public void processDelete(Lazy<GenericRecord> key) {
try {
PreparedStatement stmt = this.deletePreparedStatement.get();
// TODO: Fix this, it is broken.
stmt.setString(1, key.get().get("key").toString());
stmt.execute();
} catch (SQLException e) {
throw new VeniceException("Failed to execute delete!");
}
this.deleteProcessor.process(key.get(), null, this.deletePreparedStatement.get());
}

@Override
Expand Down Expand Up @@ -176,7 +168,7 @@ public String buildStoreNameWithVersion(int version) {
}

@Override
public void close() throws IOException {
public void close() {
this.deletePreparedStatement.close();
this.upsertPreparedStatement.close();
this.connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,44 @@ public static String upsertStatement(
}

@Nonnull
public static InsertProcessor upsertProcessor(
public static PreparedStatementProcessor upsertProcessor(
@Nonnull Schema keySchema,
@Nonnull Schema valueSchema,
@Nonnull Set<String> columnsToProject) {
return new InsertProcessor(keySchema, valueSchema, columnsToProject);
return new KeyValuePreparedStatementProcessor(keySchema, valueSchema, columnsToProject);
}

@Nonnull
public static String deleteStatement(@Nonnull String tableName, @Nonnull Schema keySchema) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("DELETE FROM " + SQLUtils.cleanTableName(tableName) + " WHERE ");
boolean firstColumn = true;

for (Schema.Field field: keySchema.getFields()) {
JDBCType correspondingType = getCorrespondingType(field);
if (correspondingType == null) {
// Skipped field.
throw new IllegalArgumentException(
"All types from the key schema must be supported, but field '" + field.name() + "' is of type: "
+ field.schema().getType());
}

if (firstColumn) {
firstColumn = false;
} else {
stringBuffer.append(" AND ");
}
stringBuffer.append(SQLUtils.cleanColumnName(field.name()));
stringBuffer.append(" = ?");
}
stringBuffer.append(";");

return stringBuffer.toString();
}

@Nonnull
public static PreparedStatementProcessor deleteProcessor(@Nonnull Schema keySchema) {
return new KeyOnlyPreparedStatementProcessor(keySchema);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.sql;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.ByteUtils;
import java.nio.ByteBuffer;
import java.sql.JDBCType;
Expand All @@ -14,70 +15,47 @@
import org.apache.avro.generic.GenericRecord;


public class InsertProcessor {
/** This class provides plumbing to plug the fields of Avro records into a {@link PreparedStatement}. */
public class KeyOnlyPreparedStatementProcessor implements PreparedStatementProcessor {
private final int[] keyFieldIndexToJdbcIndexMapping;
private final int[] valueFieldIndexToJdbcIndexMapping;
private final int[] keyFieldIndexToUnionBranchIndex;
private final int[] valueFieldIndexToUnionBranchIndex;
private final JDBCType[] keyFieldIndexToCorrespondingType;
private final JDBCType[] valueFieldIndexToCorrespondingType;

InsertProcessor(@Nonnull Schema keySchema, @Nonnull Schema valueSchema, @Nonnull Set<String> columnsToProject) {
Objects.requireNonNull(keySchema);
Objects.requireNonNull(valueSchema);
Objects.requireNonNull(columnsToProject);

int keyFieldCount = keySchema.getFields().size();
KeyOnlyPreparedStatementProcessor(@Nonnull Schema keySchema) {
int keyFieldCount = Objects.requireNonNull(keySchema).getFields().size();
this.keyFieldIndexToJdbcIndexMapping = new int[keyFieldCount];
this.keyFieldIndexToUnionBranchIndex = new int[keyFieldCount];
this.keyFieldIndexToCorrespondingType = new JDBCType[keyFieldCount];

int valueFieldCount = valueSchema.getFields().size();
this.valueFieldIndexToJdbcIndexMapping = new int[valueFieldCount];
this.valueFieldIndexToUnionBranchIndex = new int[valueFieldCount];
this.valueFieldIndexToCorrespondingType = new JDBCType[valueFieldCount];

// N.B.: JDBC indices start at 1, not at 0.
int index = 1;
index = populateArrays(
index,
populateArrays(
1, // N.B.: JDBC indices start at 1, not at 0.
keySchema,
this.keyFieldIndexToJdbcIndexMapping,
this.keyFieldIndexToUnionBranchIndex,
this.keyFieldIndexToCorrespondingType,
Collections.emptySet()); // N.B.: All key columns must be projected.
populateArrays(
index, // N.B.: The same index value needs to carry over from key to value columns.
valueSchema,
this.valueFieldIndexToJdbcIndexMapping,
this.valueFieldIndexToUnionBranchIndex,
this.valueFieldIndexToCorrespondingType,
columnsToProject);
}

@Override
public void process(GenericRecord key, GenericRecord value, PreparedStatement preparedStatement) {
try {
processRecord(
key,
preparedStatement,
this.keyFieldIndexToJdbcIndexMapping,
this.keyFieldIndexToUnionBranchIndex,
this.keyFieldIndexToCorrespondingType);

processRecord(
value,
preparedStatement,
this.valueFieldIndexToJdbcIndexMapping,
this.valueFieldIndexToUnionBranchIndex,
this.valueFieldIndexToCorrespondingType);

processKey(key, preparedStatement);
preparedStatement.execute();
} catch (SQLException e) {
throw new RuntimeException(e);
throw new VeniceException("Failed to execute prepared statement!", e);
}
}

private int populateArrays(
protected void processKey(GenericRecord key, PreparedStatement preparedStatement) throws SQLException {
processRecord(
key,
preparedStatement,
this.keyFieldIndexToJdbcIndexMapping,
this.keyFieldIndexToUnionBranchIndex,
this.keyFieldIndexToCorrespondingType);
}

protected void populateArrays(
int index,
@Nonnull Schema schema,
@Nonnull int[] avroFieldIndexToJdbcIndexMapping,
Expand Down Expand Up @@ -108,10 +86,13 @@ private int populateArrays(
}
}
}
return index;
}

private void processRecord(
protected final int getLastKeyJdbcIndex() {
return this.keyFieldIndexToJdbcIndexMapping[this.keyFieldIndexToJdbcIndexMapping.length - 1];
}

protected void processRecord(
GenericRecord record,
PreparedStatement preparedStatement,
int[] avroFieldIndexToJdbcIndexMapping,
Expand Down
Loading

0 comments on commit 5dfe33b

Please sign in to comment.