Skip to content

Commit

Permalink
feat(java/driver/flight-sql): implement getObjects (#1517)
Browse files Browse the repository at this point in the history
* Implement getObjects and each depth (CATALOGS, DB_SCHEMAS, TABLES, and
ALL).
* Handle the cases where the catalog is empty or a schema within a
catalog has no tables.
* Unify readers for queries, getInfo(), and getObjects() so that all
code paths can correctly get data when the data is not available on the
same location as the original connection, can handle multiple roots from
the same stream, and can handle multiple partitions.
* Rework getInfo() request to lazily issue an RPC call only if a request
code was used that needs information from the Flight server.

Fixes #745

---------

Co-authored-by: tokoko <togurg14@freeuni.edu.ge>
  • Loading branch information
jduo and tokoko authored Feb 18, 2024
1 parent 37026a9 commit adfb70a
Show file tree
Hide file tree
Showing 12 changed files with 1,828 additions and 385 deletions.
11 changes: 6 additions & 5 deletions java/driver/flight-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
<artifactId>adbc-sql</artifactId>
</dependency>

<!-- Helpers for mapping Arrow types to ANSI SQL types and building test servers -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
</dependency>

<!-- Static analysis and linting -->
<dependency>
<groupId>org.checkerframework</groupId>
Expand Down Expand Up @@ -96,11 +102,6 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adbc.driver.flightsql;

import static org.apache.arrow.adbc.driver.flightsql.FlightSqlDriverUtil.tryLoadNextStream;

import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Base class for ArrowReaders based on consuming data from FlightEndpoints. */
public abstract class BaseFlightReader extends ArrowReader {

private final List<FlightEndpoint> flightEndpoints;
private final Supplier<List<FlightEndpoint>> rpcCall;
private int nextEndpointIndex = 0;
private @Nullable FlightStream currentStream = null;
private @Nullable Schema schema = null;
private long bytesRead = 0;
protected final FlightSqlClientWithCallOptions client;
protected final LoadingCache<Location, FlightSqlClientWithCallOptions> clientCache;

protected BaseFlightReader(
BufferAllocator allocator,
FlightSqlClientWithCallOptions client,
LoadingCache<Location, FlightSqlClientWithCallOptions> clientCache,
Supplier<List<FlightEndpoint>> rpcCall) {
super(allocator);
this.client = client;
this.clientCache = clientCache;
this.flightEndpoints = new ArrayList<>();
this.rpcCall = rpcCall;
}

@SuppressWarnings("dereference.of.nullable")
// Checker framework is considering Arrow functions such as FlightStream.next() as potentially
// altering the state
// and able to change currentStream or schema fields to null.
@Override
public boolean loadNextBatch() throws IOException {
if (currentStream == null || schema == null) {
throw new IllegalStateException();
}

if (!currentStream.next()) {
if (nextEndpointIndex >= flightEndpoints.size()) {
return false;
} else {
try {
currentStream.close();
FlightEndpoint endpoint = flightEndpoints.get(nextEndpointIndex++);
currentStream = tryLoadNextStream(endpoint, client, clientCache);
if (currentStream == null) {
throw new IllegalStateException();
}
if (!schema.equals(currentStream.getSchema())) {
throw new IOException(
"Stream has inconsistent schema. Expected: "
+ schema
+ "\nFound: "
+ currentStream.getSchema());
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
}
processRootFromStream(currentStream.getRoot());
return true;
}

@Override
protected Schema readSchema() throws IOException {
if (schema == null) {
throw new IllegalStateException();
}
return schema;
}

@Override
public long bytesRead() {
return bytesRead;
}

@Override
protected void closeReadSource() throws IOException {
try {
AutoCloseables.close(currentStream);
} catch (Exception e) {
throw new IOException(e);
}
}

protected abstract void processRootFromStream(VectorSchemaRoot root);

protected void addBytesRead(long bytes) {
this.bytesRead += bytes;
}

protected void populateEndpointData() throws AdbcException {
try {
this.flightEndpoints.addAll(rpcCall.get());
this.currentStream =
tryLoadNextStream(flightEndpoints.get(this.nextEndpointIndex++), client, clientCache);
this.schema = this.currentStream.getSchema();
} catch (FlightRuntimeException e) {
throw FlightSqlDriverUtil.fromFlightException(e);
} catch (IOException e) {
throw new AdbcException(e.getMessage(), e, AdbcStatusCode.IO, null, 0);
}
}

protected void loadRoot(VectorSchemaRoot root) {
final VectorUnloader unloader = new VectorUnloader(root);
final ArrowRecordBatch recordBatch = unloader.getRecordBatch();
addBytesRead(recordBatch.computeBodyLength());
loadRecordBatch(recordBatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,16 @@

import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;

/** An ArrowReader that wraps a FlightInfo. */
public class FlightInfoReader extends ArrowReader {
private final Schema schema;
private final FlightSqlClientWithCallOptions client;
private final LoadingCache<Location, FlightSqlClientWithCallOptions> clientCache;
private final List<FlightEndpoint> flightEndpoints;
private int nextEndpointIndex;
private FlightStream currentStream;
private long bytesRead;

public class FlightInfoReader extends BaseFlightReader {
@SuppressWarnings(
"method.invocation") // Checker Framework does not like the ensureInitialized call
FlightInfoReader(
Expand All @@ -54,21 +36,9 @@ public class FlightInfoReader extends ArrowReader {
LoadingCache<Location, FlightSqlClientWithCallOptions> clientCache,
List<FlightEndpoint> flightEndpoints)
throws AdbcException {
super(allocator);
this.client = client;
this.clientCache = clientCache;
this.flightEndpoints = flightEndpoints;
this.nextEndpointIndex = 0;
this.bytesRead = 0;

try {
this.currentStream =
client.getStream(flightEndpoints.get(this.nextEndpointIndex++).getTicket());
this.schema = this.currentStream.getSchema();
} catch (FlightRuntimeException e) {
throw FlightSqlDriverUtil.fromFlightException(e);
}
super(allocator, client, clientCache, () -> flightEndpoints);

populateEndpointData();
try {
this.ensureInitialized();
} catch (IOException e) {
Expand All @@ -82,85 +52,7 @@ public class FlightInfoReader extends ArrowReader {
}

@Override
public boolean loadNextBatch() throws IOException {
if (!currentStream.next()) {
if (nextEndpointIndex >= flightEndpoints.size()) {
return false;
} else {
try {
currentStream.close();
FlightEndpoint endpoint = flightEndpoints.get(nextEndpointIndex++);
currentStream = tryLoadNextStream(endpoint);
if (!schema.equals(currentStream.getSchema())) {
throw new IOException(
"Stream has inconsistent schema. Expected: "
+ schema
+ "\nFound: "
+ currentStream.getSchema());
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
}
final VectorSchemaRoot root = currentStream.getRoot();
final VectorUnloader unloader = new VectorUnloader(root);
final ArrowRecordBatch recordBatch = unloader.getRecordBatch();
bytesRead += recordBatch.computeBodyLength();
loadRecordBatch(recordBatch);
return true;
}

private FlightStream tryLoadNextStream(FlightEndpoint endpoint) throws IOException {
if (endpoint.getLocations().isEmpty()) {
return client.getStream(endpoint.getTicket());
} else {
List<Location> locations = new ArrayList<>(endpoint.getLocations());
Collections.shuffle(locations);
IOException failure = null;
for (final Location location : locations) {
final @Nullable FlightSqlClientWithCallOptions client = clientCache.get(location);
if (client == null) {
throw new IllegalStateException("Could not connect to " + location);
}
try {
return client.getStream(endpoint.getTicket());
} catch (RuntimeException e) {
// Also handles CompletionException (from clientCache#get), FlightRuntimeException
if (failure == null) {
failure =
new IOException("Failed to get stream from location " + location + ": " + e, e);
} else {
failure.addSuppressed(
new IOException("Failed to get stream from location " + location + ": " + e, e));
}
}
}
if (failure == null) {
throw new IllegalStateException("FlightEndpoint had no locations");
}
throw Objects.requireNonNull(failure);
}
}

@Override
public long bytesRead() {
return bytesRead;
}

@Override
protected void closeReadSource() throws IOException {
try {
currentStream.close();
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
protected Schema readSchema() {
return schema;
protected void processRootFromStream(VectorSchemaRoot root) {
loadRoot(root);
}
}
Loading

0 comments on commit adfb70a

Please sign in to comment.