Skip to content

Commit

Permalink
feat: Barrage Refactor Read/Write Chunk Factories (#6065)
Browse files Browse the repository at this point in the history
This is an overhaul of type mapping from arrow to deephaven columns. We
can now coerce from many kinds of types, enabling widening / truncating
and conversion between types. For example uint64_t is now mapped to
BigInteger by default, but can be explicitly mapped to `long.class` (may
overflow).

To set an explicit mapping set the schema's column attribute "deephaven:type" to the
canonical name of the class.

Fixes #58 (custom type serialization / deserialization).
Fixes #936 (ColumnConversionModes is being replaced with easy to
integrate custom serialization).
Fixes #2984 (refactoring has good interface documentation).
Fixes #3403 (by supporting a variety of mappings, these now must match
client wiring).
Fixes #5258 (snapshot/subscribe methods with default w2w options).
Fixes #5453 (support other Timestamp arrow wire encodings).
Fixes #5864 (support for uint64_t).
Fixes #6114 (supports ObjectVector<Boolean> properly).
  • Loading branch information
nbauernfeind authored Feb 28, 2025
1 parent 4d57893 commit b32641f
Show file tree
Hide file tree
Showing 154 changed files with 15,562 additions and 5,555 deletions.
1 change: 1 addition & 0 deletions cpp-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def testCppClient = Docker.registerDockerTask(project, 'testCppClient') {
environmentVariable 'DH_HOST', deephavenDocker.containerName.get()
environmentVariable 'DH_PORT', '10000'
}
waitTimeMinutes = 1
containerDependencies.dependsOn = [deephavenDocker.healthyTask]
containerDependencies.finalizedBy = deephavenDocker.endTask
network = deephavenDocker.networkName.get()
Expand Down
2 changes: 1 addition & 1 deletion cpp-client/deephaven/tests/src/time_unit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST_CASE("Uploaded Arrow Timestamp units get normalized to nanos at FillChunk t
}
}

TEST_CASE("Uploaded Arrow Time64 units get normalized to nanos at FillChunk time", "[timeunit][.hidden]") {
TEST_CASE("Uploaded Arrow Time64 units get normalized to nanos at FillChunk time", "[timeunit]") {
auto tm = TableMakerForTests::Create();

std::vector<std::optional<InternalLocalTime<arrow::TimeUnit::MICRO>>> lt_micro;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ public <Other> ColumnDefinition<Other> withDataType(@NotNull final Class<Other>
: fromGenericType(name, newDataType, componentType, columnType);
}

public <Other> ColumnDefinition<Other> withDataType(
@NotNull final Class<Other> newDataType,
@Nullable final Class<?> newComponentType) {
// noinspection unchecked
return dataType == newDataType && componentType == newComponentType
? (ColumnDefinition<Other>) this
: fromGenericType(name, newDataType, newComponentType, columnType);
}

public ColumnDefinition<?> withName(@NotNull final String newName) {
return newName.equals(name) ? this : new ColumnDefinition<>(newName, dataType, componentType, columnType);
}
Expand Down
6 changes: 6 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public interface Table extends
* Set this attribute to enable collection of barrage performance stats.
*/
String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey";
/**
* Set an Apache Arrow POJO Schema to this attribute to control the column encoding used for barrage serialization.
* <p>
* See {@code org.apache.arrow.vector.types.pojo.Schema}.
*/
String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema";

// -----------------------------------------------------------------------------------------------------------------
// ColumnSources for fetching data by row key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -74,6 +75,12 @@ public final boolean get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return false;
}
// endregion isNull

@Override
public BooleanChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -78,6 +79,12 @@ public final byte get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_BYTE;
}
// endregion isNull

@Override
public ByteChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -73,6 +74,12 @@ public final char get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_CHAR;
}
// endregion isNull

@Override
public CharChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final double get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_DOUBLE;
}
// endregion isNull

@Override
public DoubleChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/FloatChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final float get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_FLOAT;
}
// endregion isNull

@Override
public FloatChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final int get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_INT;
}
// endregion isNull

@Override
public IntChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final long get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_LONG;
}
// endregion isNull

@Override
public LongChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final T get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == null;
}
// endregion isNull

@Override
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
* {@link Chunk} that may have its backing storage reset to a slice of that belonging to another {@link Chunk} or a
* native array.
*/
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any> extends ResettableChunk<ATTR_BASE>, PoolableChunk {
public interface ResettableReadOnlyChunk<ATTR_BASE extends Any>
extends ResettableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

/**
* Reset the data and bounds of this chunk to a range or sub-range of the specified {@link Chunk}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* {@link WritableChunk} or a native array.
*/
public interface ResettableWritableChunk<ATTR_BASE extends Any>
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk {
extends ResettableChunk<ATTR_BASE>, WritableChunk<ATTR_BASE>, PoolableChunk<ATTR_BASE> {

@Override
<ATTR extends ATTR_BASE> WritableChunk<ATTR> resetFromChunk(WritableChunk<ATTR> other, int offset, int capacity);
Expand Down
7 changes: 7 additions & 0 deletions engine/chunk/src/main/java/io/deephaven/chunk/ShortChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.chunk;

import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.chunk.attributes.Any;

Expand Down Expand Up @@ -77,6 +78,12 @@ public final short get(int index) {
return data[offset + index];
}

// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_SHORT;
}
// endregion isNull

@Override
public ShortChunk<ATTR> slice(int offset, int capacity) {
ChunkHelpers.checkSliceArgs(size, offset, capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* @param <ATTR> Descriptive attribute that applies to the elements stored within this WritableChunk
*/
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk {
public interface WritableChunk<ATTR extends Any> extends Chunk<ATTR>, PoolableChunk<ATTR> {
@Override
WritableChunk<ATTR> slice(int offset, int capacity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public WritableChunk<T> get() {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* The data and size of the returned chunk are undefined.
*
* @param capacity the minimum capacity for the chunk.
Expand All @@ -56,9 +56,9 @@ public WritableChunk<T> ensureCapacity(int capacity) {

/**
* Ensure the underlying chunk has a capacity of at least {@code capacity}.
*
* <p>
* If the chunk has existing data, then it is copied to the new chunk.
*
* <p>
* If the underlying chunk already exists, then the size of the chunk is the original size. If the chunk did not
* exist, then the size of the returned chunk is zero.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
package io.deephaven.chunk.util.pools;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;

/**
* Marker interface for {@link Chunk} subclasses that can be kept with in a {@link ChunkPool}, and whose
* {@link #close()} method will return them to the appropriate pool.
*/
public interface PoolableChunk extends SafeCloseable {
public interface PoolableChunk<ATTR extends Any> extends Chunk<ATTR>, SafeCloseable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,15 @@ public enum CopyAttributeOperation {
CopyAttributeOperation.Flatten, // add flatten for now because web flattens all views
CopyAttributeOperation.Preview));

tempMap.put(BARRAGE_SCHEMA_ATTRIBUTE, EnumSet.of(
CopyAttributeOperation.Filter,
CopyAttributeOperation.FirstBy,
CopyAttributeOperation.Flatten,
CopyAttributeOperation.LastBy,
CopyAttributeOperation.PartitionBy,
CopyAttributeOperation.Reverse,
CopyAttributeOperation.Sort));

attributeToCopySet = Collections.unmodifiableMap(tempMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl.sources;

import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -212,6 +213,27 @@ public static ColumnSource<?>[] maybeConvertToPrimitive(@NotNull final ColumnSou
return result;
}

/**
* If {@code columnDefinition.getDataType()} or {@code columnDefinition.getComponentType} are something that we
* prefer to handle as a primitive, do the appropriate conversion.
*
* @param columnDefinition The column definition to convert
* @return if possible, {@code columnDefinition} converted to a primitive, otherewise {@code columnDefinition}
*/
@NotNull
public static ColumnDefinition<?> maybeConvertToPrimitive(@NotNull final ColumnDefinition<?> columnDefinition) {
final Class<?> dataType = ReinterpretUtils.maybeConvertToPrimitiveDataType(columnDefinition.getDataType());
Class<?> componentType = columnDefinition.getComponentType();
if (componentType != null) {
componentType = ReinterpretUtils.maybeConvertToPrimitiveDataType(componentType);
}
if (columnDefinition.getDataType() == dataType
&& columnDefinition.getComponentType() == componentType) {
return columnDefinition;
}
return columnDefinition.withDataType(dataType, componentType);
}

/**
* If {@code source} is something that we prefer to handle as a primitive, do the appropriate conversion.
*
Expand Down Expand Up @@ -265,6 +287,7 @@ public static ChunkType maybeConvertToWritablePrimitiveChunkType(@NotNull final
}
if (dataType == Instant.class) {
// Note that storing ZonedDateTime as a primitive is lossy on the time zone.
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return ChunkType.Long;
}
return ChunkType.fromElementType(dataType);
Expand All @@ -283,6 +306,8 @@ public static Class<?> maybeConvertToPrimitiveDataType(@NotNull final Class<?> d
return byte.class;
}
if (dataType == Instant.class || dataType == ZonedDateTime.class) {
// Note: not all ZonedDateTime sources are convertible to long, so this doesn't match column source behavior
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return long.class;
}
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public static Builder newBuilder(final String name) {

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";

public static int getDefaultTargetCycleDurationMillis() {
return Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
}

private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
private final ThreadInitializationFactory threadInitializationFactory;
Expand Down Expand Up @@ -252,7 +257,7 @@ public boolean isCycleOnBudget(long cycleTimeNanos) {
* Resets the run cycle time to the default target configured via the {@link Builder} setting.
*
* @implNote If the {@link Builder#targetCycleDurationMillis(long)} property is not set, this value defaults to
* {@link Builder#DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
* {@link #DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP} which defaults to 1000ms.
*/
@SuppressWarnings("unused")
public void resetTargetCycleDuration() {
Expand Down Expand Up @@ -1166,8 +1171,7 @@ public static PeriodicUpdateGraph getInstance(final String name) {
public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
private long targetCycleDurationMillis = getDefaultTargetCycleDurationMillis();
private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;

private String name;
Expand Down
Empty file.
Loading

0 comments on commit b32641f

Please sign in to comment.