Skip to content

Commit

Permalink
Fix #6201 by converting primitives to Objects inside of WebChunkReade…
Browse files Browse the repository at this point in the history
…rFactory
  • Loading branch information
nbauernfeind committed Feb 28, 2025
1 parent b32641f commit cdf55dd
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 864 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ public static void main(final String[] args) throws IOException {
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/IntVectorExpansionKernel.java", "Int");
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/LongVectorExpansionKernel.java", "Long");
fixupVectorExpansionKernel(CHUNK_PACKAGE + "/vector/DoubleVectorExpansionKernel.java", "Double");

ReplicatePrimitiveCode.charToAllButBoolean("replicateBarrageUtils",
"web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebCharColumnData.java");
}

private static void fixupDoubleChunkReader(final @NotNull String path) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata;
import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.BarrageOptions;
import io.deephaven.extensions.barrage.BarrageTypeInfo;
Expand Down Expand Up @@ -59,7 +61,6 @@ public class WebBarrageMessageReader {

public WebBarrageMessage parseFrom(
final BarrageOptions options,
ChunkType[] columnChunkTypes,
Class<?>[] columnTypes,
Class<?>[] componentTypes,
FlightData flightData) throws IOException {
Expand Down Expand Up @@ -121,7 +122,7 @@ public WebBarrageMessage parseFrom(

// create an initial chunk of the correct size
final int chunkSize = (int) (Math.min(msg.rowsIncluded.size(), MAX_CHUNK_SIZE));
final WritableChunk<Values> chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
final WritableChunk<Values> chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
chunk.setSize(0);
msg.addColumnData[ci].data.add(chunk);
}
Expand All @@ -142,7 +143,7 @@ public WebBarrageMessage parseFrom(
// create an initial chunk of the correct size
final int chunkSize = (int) (Math.min(msg.modColumnData[ci].rowsModified.size(),
MAX_CHUNK_SIZE));
final WritableChunk<Values> chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
final WritableChunk<Values> chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
chunk.setSize(0);
msg.modColumnData[ci].data.add(chunk);

Expand Down Expand Up @@ -217,7 +218,7 @@ public WebBarrageMessage parseFrom(
if (batch.length() > chunk.capacity() - chunk.size()) {
// reading the rows from this batch will overflow the existing chunk; create a new one
final int chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE));
chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
acd.data.add(chunk);

chunk.setSize(0);
Expand Down Expand Up @@ -248,7 +249,7 @@ public WebBarrageMessage parseFrom(
if (numRowsToRead > chunk.capacity() - chunk.size()) {
// reading the rows from this batch will overflow the existing chunk; create a new one
final int chunkSize = (int) (Math.min(remaining, MAX_CHUNK_SIZE));
chunk = columnChunkTypes[ci].makeWritableChunk(chunkSize);
chunk = WritableObjectChunk.makeWritableChunk(chunkSize);
mcd.data.add(chunk);

chunk.setSize(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@

import elemental2.core.JsDate;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.DoubleChunk;
import io.deephaven.chunk.FloatChunk;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.ShortChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
Expand Down Expand Up @@ -38,6 +44,7 @@
import io.deephaven.web.client.api.LocalDateWrapper;
import io.deephaven.web.client.api.LocalTimeWrapper;
import io.deephaven.web.client.api.LongWrapper;
import jsinterop.base.Js;
import org.apache.arrow.flatbuf.Date;
import org.apache.arrow.flatbuf.DateUnit;
import org.apache.arrow.flatbuf.Field;
Expand Down Expand Up @@ -75,50 +82,10 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
@NotNull final BarrageOptions options) {
switch (typeInfo.arrowField().typeType()) {
case Type.Int: {
Int t = new Int();
typeInfo.arrowField().type(t);
switch (t.bitWidth()) {
case 8: {
return (ChunkReader<T>) new ByteChunkReader(options);
}
case 16: {
if (t.isSigned()) {
return (ChunkReader<T>) new ShortChunkReader(options);
}
return (ChunkReader<T>) new CharChunkReader(options);
}
case 32: {
return (ChunkReader<T>) new IntChunkReader(options);
}
case 64: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new LongChunkReader(options),
(src, dst, dstOffset) -> {
for (int ii = 0; ii < src.size(); ++ii) {
dst.set(dstOffset + ii, LongWrapper.of(src.get(ii)));
}
});
}
throw new IllegalArgumentException("Unsigned 64bit integers not supported");
}
default:
throw new IllegalArgumentException("Unsupported Int bitwidth: " + t.bitWidth());
}
return newIntReader(typeInfo, options);
}
case Type.FloatingPoint: {
FloatingPoint t = new FloatingPoint();
typeInfo.arrowField().type(t);
switch (t.precision()) {
case Precision.SINGLE: {
return (ChunkReader<T>) new FloatChunkReader(options);
}
case Precision.DOUBLE: {
return (ChunkReader<T>) new DoubleChunkReader(options);
}
default:
throw new IllegalArgumentException(
"Unsupported FloatingPoint precision " + Precision.name(t.precision()));
}
return newFloatReader(typeInfo, options);
}
case Type.Binary: {
if (typeInfo.type() == BigIntegerWrapper.class) {
Expand Down Expand Up @@ -160,7 +127,7 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
case Type.Bool: {
BooleanChunkReader subReader = new BooleanChunkReader();
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableByteChunk<Values> inner = (WritableByteChunk<Values>) subReader.readChunk(
try (final WritableByteChunk<Values> inner = subReader.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<Boolean, Values> chunk;
Expand All @@ -184,7 +151,6 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(

return (T) chunk;
}

};
}
case Type.Date: {
Expand Down Expand Up @@ -331,6 +297,102 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReader(
}
}

@SuppressWarnings("unchecked")
private static <T extends WritableChunk<Values>> @NotNull ChunkReader<T> newFloatReader(
@NotNull BarrageTypeInfo<Field> typeInfo, @NotNull BarrageOptions options) {
FloatingPoint t = new FloatingPoint();
typeInfo.arrowField().type(t);
switch (t.precision()) {
case Precision.SINGLE: {
return (ChunkReader<T>) transformToObject(new FloatChunkReader(options),
(src, dst, dstOffset) -> {
final FloatChunk<?> floatChunk = src.asFloatChunk();
for (int ii = 0; ii < src.size(); ++ii) {
float value = floatChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_FLOAT ? null : Js.asAny(value));
}
});
}
case Precision.DOUBLE: {
return (ChunkReader<T>) transformToObject(new DoubleChunkReader(options),
(src, dst, dstOffset) -> {
final DoubleChunk<?> floatChunk = src.asDoubleChunk();
for (int ii = 0; ii < src.size(); ++ii) {
double value = floatChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_DOUBLE ? null : Js.asAny(value));
}
});
}
default:
throw new IllegalArgumentException(
"Unsupported FloatingPoint precision " + Precision.name(t.precision()));
}
}

@SuppressWarnings("unchecked")
private static <T extends WritableChunk<Values>> ChunkReader<T> newIntReader(
@NotNull final BarrageTypeInfo<Field> typeInfo,
@NotNull final BarrageOptions options) {
final Int t = new Int();
typeInfo.arrowField().type(t);
switch (t.bitWidth()) {
case 8: {
return (ChunkReader<T>) transformToObject(new ByteChunkReader(options),
(src, dst, dstOffset) -> {
final ByteChunk<?> byteChunk = src.asByteChunk();
for (int ii = 0; ii < src.size(); ++ii) {
byte value = byteChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_BYTE ? null : Js.asAny(value));
}
});
}
case 16: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new ShortChunkReader(options),
(src, dst, dstOffset) -> {
final ShortChunk<?> shortChunk = src.asShortChunk();
for (int ii = 0; ii < src.size(); ++ii) {
short value = shortChunk.get(ii);
dst.set(dstOffset + ii,
value == QueryConstants.NULL_SHORT ? null : Js.asAny(value));
}
});
}
return (ChunkReader<T>) transformToObject(new CharChunkReader(options),
(src, dst, dstOffset) -> {
final CharChunk<?> charChunk = src.asCharChunk();
for (int ii = 0; ii < src.size(); ++ii) {
char value = charChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_CHAR ? null : Js.asAny(value));
}
});
}
case 32: {
return (ChunkReader<T>) transformToObject(new IntChunkReader(options),
(src, dst, dstOffset) -> {
final IntChunk<?> intChunk = src.asIntChunk();
for (int ii = 0; ii < src.size(); ++ii) {
int value = intChunk.get(ii);
dst.set(dstOffset + ii, value == QueryConstants.NULL_INT ? null : Js.asAny(value));
}
});
}
case 64: {
if (t.isSigned()) {
return (ChunkReader<T>) transformToObject(new LongChunkReader(options),
(src, dst, dstOffset) -> {
for (int ii = 0; ii < src.size(); ++ii) {
dst.set(dstOffset + ii, LongWrapper.of(src.get(ii)));
}
});
}
throw new IllegalArgumentException("Unsigned 64bit integers not supported");
}
default:
throw new IllegalArgumentException("Unsupported Int bitwidth: " + t.bitWidth());
}
}

public interface Mapper<T> {
T constructFrom(byte[] buf, int offset, int length) throws IOException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.web.client.api.barrage.data;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.web.client.api.barrage.WebBarrageMessage;
import io.deephaven.web.client.api.barrage.def.InitialTableDefinition;
Expand Down Expand Up @@ -45,36 +44,8 @@ public static WebBarrageSubscription subscribe(
final DataChangedHandler dataChangedHandler) {

WebColumnData[] dataSinks = new WebColumnData[cts.columnTypes().length];
ChunkType[] chunkTypes = cts.chunkTypes();
for (int i = 0; i < dataSinks.length; i++) {
switch (chunkTypes[i]) {
case Boolean:
throw new IllegalStateException("Boolean unsupported here");
case Char:
dataSinks[i] = new WebCharColumnData();
break;
case Byte:
dataSinks[i] = new WebByteColumnData();
break;
case Short:
dataSinks[i] = new WebShortColumnData();
break;
case Int:
dataSinks[i] = new WebIntColumnData();
break;
case Long:
dataSinks[i] = new WebLongColumnData();
break;
case Float:
dataSinks[i] = new WebFloatColumnData();
break;
case Double:
dataSinks[i] = new WebDoubleColumnData();
break;
case Object:
dataSinks[i] = new WebObjectColumnData();
break;
}
dataSinks[i] = new WebColumnData();
}

if (cts.getTableDef().getAttributes().isBlinkTable()) {
Expand Down Expand Up @@ -207,7 +178,7 @@ public void applyUpdates(WebBarrageMessage message) {
PrimitiveIterator.OfLong destIterator = destinationRowSet.indexIterator();
for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -356,7 +327,7 @@ public void applyUpdates(WebBarrageMessage message) {

for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -395,7 +366,7 @@ public boolean hasNext() {
};
for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, destIterator);
destSources[ii].fillFromChunk(chunk, destIterator);
}
assert !destIterator.hasNext();
}
Expand Down Expand Up @@ -538,7 +509,7 @@ public void applyUpdates(WebBarrageMessage message) {

for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, column.rowsModified.indexIterator());
destSources[ii].fillFromChunk(chunk, column.rowsModified.indexIterator());
}
}

Expand Down
Loading

0 comments on commit cdf55dd

Please sign in to comment.