Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Simplify Barrage Viewport Table Updates #6347

Merged
merged 38 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1852b0a
Restore initial commit from grpc-java, plus a few local changes
niloc132 Oct 21, 2024
68b925a
Guard writing payload as hex if FINEST is enabled
niloc132 Oct 21, 2024
e88c47e
Apply upstream "Fix AsyncServletOutputStreamWriterConcurrencyTest
niloc132 Oct 21, 2024
f9a19fc
Apply upstream "Avoid flushing headers when the server returns a single
niloc132 Oct 21, 2024
4733524
Apply upstream "servlet: introduce ServletServerBuilder.buildServlet()"
niloc132 Oct 21, 2024
06e63ec
Bump grpc vers, add inprocess dep for tests
niloc132 Oct 21, 2024
09ade64
Merge branch 'main' into grpc-history-replay
niloc132 Oct 28, 2024
c8af47c
Apply https://github.com/deephaven/deephaven-core/pull/6301
niloc132 Oct 28, 2024
57c8008
Bump to 1.65.1 to better match arrow 18
niloc132 Nov 1, 2024
cbf8ab2
Merge remote-tracking branch 'colin/grpc-history-replay' into vp_simp…
nbauernfeind Nov 6, 2024
85f604f
Version Upgrades; MavenLocal
nbauernfeind Nov 6, 2024
70a0207
Implement Simplified Viewport Table Updates in BMP/BT
nbauernfeind Nov 8, 2024
0089d62
Ryan's Synchronous Review
nbauernfeind Nov 9, 2024
485746d
Merge remote-tracking branch 'upstream/main' into vp_simplification
nbauernfeind Nov 9, 2024
ad8de73
Remove SNAPSHOT version and mavenLocal references
nbauernfeind Nov 11, 2024
02ce2ad
Fixes removed/added rows in most VP cases
nbauernfeind Nov 12, 2024
da23e2b
Bug fixes around viewport snapshot rowsRemoved and rowsAdded
nbauernfeind Nov 12, 2024
299f56e
Bugfix for correct growing VP logic
nbauernfeind Nov 12, 2024
9d6f389
remaining java side fixes
nbauernfeind Nov 13, 2024
fd5aced
Ryan's feedback on javaserver/client impls
nbauernfeind Nov 14, 2024
53b1eed
Inline Feedback from VC w/Ryan
nbauernfeind Nov 14, 2024
6e7fe94
Do not propagate modifies for any repainted rows
nbauernfeind Nov 14, 2024
d568eb7
Minor cleanup from personal review
nbauernfeind Nov 14, 2024
6653ca6
Ryan's feedback latest round.
nbauernfeind Nov 14, 2024
44cdf93
jsAPI mostly complete; looking for tree table issue
nbauernfeind Nov 15, 2024
d549d79
Fixes for jsapi and HierarchicalTable
nbauernfeind Nov 15, 2024
b4d5b69
Lazily compute rowset encoding
nbauernfeind Nov 15, 2024
6c12314
Fixup jsapi tests
nbauernfeind Nov 15, 2024
f9be6e5
Quick round feedback
nbauernfeind Nov 15, 2024
4252622
spotless
nbauernfeind Nov 15, 2024
2767def
Double checked locking fixes
nbauernfeind Nov 15, 2024
78c4cb7
Ryan's final review
nbauernfeind Nov 15, 2024
ea6f898
Clarify strategy on who owns RowSets passed into getSubView
nbauernfeind Nov 15, 2024
3eeb628
npe fix
nbauernfeind Nov 15, 2024
84a6100
Bugfix if HT is empty or viewport past end of table
nbauernfeind Nov 16, 2024
476ae65
Colin's feedback
nbauernfeind Nov 16, 2024
738cb11
Limit jsapi data change event to prev and curr table sizes
nbauernfeind Nov 16, 2024
44fadff
Colin's Final Feedback
nbauernfeind Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(
final long clockStep =
callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
final BarrageMessage snapshot = snapshotMsg.getValue();
snapshot.step = snapshot.firstSeq = snapshot.lastSeq = clockStep;
snapshot.firstSeq = snapshot.lastSeq = clockStep;
return snapshot;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static class AddColumnData {

public long firstSeq = -1;
public long lastSeq = -1;
public long step = -1;
/** The size of the table after this update. -1 if unknown. */
public long tableSize = -1;

public boolean isSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,9 +1119,12 @@ private int appendModColumns(final RecordBatchMessageView view, final long start

public static abstract class ByteArrayGenerator {
protected int len;
protected byte[] raw;
protected volatile byte[] raw;

protected int addToFlatBuffer(final FlatBufferBuilder builder) {
protected abstract void ensureComputed() throws IOException;

protected int addToFlatBuffer(final FlatBufferBuilder builder) throws IOException {
ensureComputed();
return builder.createByteVector(raw, 0, len);
}
}
Expand All @@ -1131,22 +1134,31 @@ public static class RowSetGenerator extends ByteArrayGenerator implements SafeCl

public RowSetGenerator(final RowSet rowSet) throws IOException {
this.original = rowSet.copy();
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final LittleEndianDataOutputStream oos = new LittleEndianDataOutputStream(baos)) {
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, rowSet);
oos.flush();
raw = baos.peekBuffer();
len = baos.size();
}
}

@Override
public void close() {
original.close();
}

public DrainableByteArrayInputStream getInputStream() {
return new DrainableByteArrayInputStream(raw, 0, len);
protected void ensureComputed() throws IOException {
if (raw != null) {
return;
}

synchronized (this) {
if (raw != null) {
return;
}

try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final LittleEndianDataOutputStream oos = new LittleEndianDataOutputStream(baos)) {
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, original);
oos.flush();
len = baos.size();
raw = baos.peekBuffer();
}
}
}

/**
Expand All @@ -1158,6 +1170,7 @@ public DrainableByteArrayInputStream getInputStream() {
*/
protected int addToFlatBuffer(final RowSet viewport, final FlatBufferBuilder builder) throws IOException {
if (original.subsetOf(viewport)) {
ensureComputed();
return addToFlatBuffer(builder);
}

Expand All @@ -1177,11 +1190,27 @@ protected int addToFlatBuffer(final RowSet viewport, final FlatBufferBuilder bui
}

public static class BitSetGenerator extends ByteArrayGenerator {
private final BitSet original;

public BitSetGenerator(final BitSet bitset) {
BitSet original = bitset == null ? new BitSet() : bitset;
this.raw = original.toByteArray();
final int nBits = original.previousSetBit(Integer.MAX_VALUE - 1) + 1;
this.len = (int) ((long) nBits + 7) / 8;
original = bitset == null ? new BitSet() : (BitSet) bitset.clone();
}

@Override
protected void ensureComputed() {
if (raw != null) {
return;
}

synchronized (this) {
if (raw != null) {
return;
}

final int nBits = original.previousSetBit(Integer.MAX_VALUE - 1) + 1;
len = (int) ((long) nBits + 7) / 8;
raw = original.toByteArray();
}
}
}

Expand All @@ -1190,37 +1219,49 @@ public static class RowSetShiftDataGenerator extends ByteArrayGenerator {

public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOException {
original = shifted;
}

final RowSetBuilderSequential sRangeBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential eRangeBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential destBuilder = RowSetFactory.builderSequential();
protected void ensureComputed() throws IOException {
if (raw != null) {
return;
}

if (shifted != null) {
for (int i = 0; i < shifted.size(); ++i) {
long s = shifted.getBeginRange(i);
final long dt = shifted.getShiftDelta(i);
synchronized (this) {
if (raw != null) {
return;
}

if (dt < 0 && s < -dt) {
s = -dt;
}
final RowSetBuilderSequential sRangeBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential eRangeBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential destBuilder = RowSetFactory.builderSequential();

if (original != null) {
for (int i = 0; i < original.size(); ++i) {
long s = original.getBeginRange(i);
final long dt = original.getShiftDelta(i);

if (dt < 0 && s < -dt) {
s = -dt;
}

sRangeBuilder.appendKey(s);
eRangeBuilder.appendKey(shifted.getEndRange(i));
destBuilder.appendKey(s + dt);
sRangeBuilder.appendKey(s);
eRangeBuilder.appendKey(original.getEndRange(i));
destBuilder.appendKey(s + dt);
}
}
}

try (final RowSet sRange = sRangeBuilder.build();
final RowSet eRange = eRangeBuilder.build();
final RowSet dest = destBuilder.build();
final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final LittleEndianDataOutputStream oos = new LittleEndianDataOutputStream(baos)) {
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, sRange);
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, eRange);
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, dest);
oos.flush();
raw = baos.peekBuffer();
len = baos.size();
try (final RowSet sRange = sRangeBuilder.build();
final RowSet eRange = eRangeBuilder.build();
final RowSet dest = destBuilder.build();
final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final LittleEndianDataOutputStream oos = new LittleEndianDataOutputStream(baos)) {
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, sRange);
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, eRange);
ExternalizableRowSetUtils.writeExternalCompressedDeltas(oos, dest);
oos.flush();
len = baos.size();
raw = baos.peekBuffer();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

// prepare updates to propagate
final long maxStep = snapshot != null ? snapshot.step : Long.MAX_VALUE;
final long maxStep = snapshot != null ? snapshot.firstSeq : Long.MAX_VALUE;

int deltaSplitIdx = pendingDeltas.size();
for (; deltaSplitIdx > 0; --deltaSplitIdx) {
Expand Down Expand Up @@ -2037,6 +2037,7 @@ final class ColumnInfo {
propagationRowSet.remove(downstream.rowsRemoved);
downstream.shifted.apply(propagationRowSet);
propagationRowSet.insert(downstream.rowsAdded);
downstream.tableSize = propagationRowSet.size();

return downstream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
Expand Down Expand Up @@ -92,7 +93,7 @@ HierarchicalTableViewSubscription create(
// region Guarded by snapshot lock
private BitSet columns;
private RowSet rows;
private long lastExpandedSize;
private final WritableRowSet prevKeyspaceViewportRows = RowSetFactory.empty();
// endregion Guarded by snapshot lock

private enum State {
Expand Down Expand Up @@ -282,16 +283,16 @@ private void process() {
return;
}
try {
lastExpandedSize = buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view,
this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, lastExpandedSize);
buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view,
this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, prevKeyspaceViewportRows);
} catch (Exception e) {
GrpcUtil.safelyError(listener, errorTransformer.transform(e));
state = State.Done;
}
}
}

private static long buildAndSendSnapshot(
private static void buildAndSendSnapshot(
@NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory,
@NotNull final StreamObserver<BarrageStreamGenerator.MessageView> listener,
@NotNull final BarrageSubscriptionOptions subscriptionOptions,
Expand All @@ -300,7 +301,7 @@ private static long buildAndSendSnapshot(
@NotNull final BarragePerformanceLog.WriteMetricsConsumer writeMetricsConsumer,
@NotNull final BitSet columns,
@NotNull final RowSet rows,
final long lastExpandedSize) {
@NotNull final WritableRowSet prevKeyspaceViewportRows) {
// 1. Grab some schema and snapshot information
final List<ColumnDefinition<?>> columnDefinitions =
view.getHierarchicalTable().getAvailableColumnDefinitions();
Expand All @@ -322,18 +323,20 @@ private static long buildAndSendSnapshot(
columns, rows, destinations);
snapshotNanosConsumer.accept(System.nanoTime() - snapshotStartNanos);

// note that keyspace is identical to position space for HierarchicalTableView snapshots
final RowSet snapshotRows = RowSetFactory.flat(expandedSize);
final RowSet keyspaceViewportRows = rows.intersect(snapshotRows);

// 4. Make and populate a BarrageMessage
final BarrageMessage barrageMessage = new BarrageMessage();
// We don't populate firstSeq, or lastSeq debugging information; they are not relevant to this use case.

barrageMessage.isSnapshot = true;
// We don't populate length, snapshotRowSet, snapshotRowSetIsReversed, or snapshotColumns; they are only set by
// the client.
// We don't populate step, firstSeq, or lastSeq debugging information; they are not relevant to this use case.

barrageMessage.rowsAdded = RowSetFactory.flat(expandedSize);
barrageMessage.rowsIncluded = RowSetFactory.fromRange(rows.firstRowKey(),
Math.min(barrageMessage.rowsAdded.lastRowKey(), rows.lastRowKey()));
barrageMessage.rowsRemoved = RowSetFactory.flat(lastExpandedSize);
barrageMessage.rowsAdded = snapshotRows;
barrageMessage.rowsIncluded = keyspaceViewportRows;
barrageMessage.rowsRemoved = RowSetFactory.empty();
barrageMessage.shifted = RowSetShiftData.EMPTY;
barrageMessage.tableSize = expandedSize;

barrageMessage.addColumnData = new BarrageMessage.AddColumnData[numAvailableColumns];
for (int ci = 0, di = 0; ci < numAvailableColumns; ++ci) {
Expand All @@ -357,15 +360,16 @@ private static long buildAndSendSnapshot(
// 5. Send the BarrageMessage
try (final BarrageStreamGenerator streamGenerator =
streamGeneratorFactory.newGenerator(barrageMessage, writeMetricsConsumer)) {
// Note that we're always specifying "isInitialSnapshot=true". This is to provoke the subscription view to
// send the added rows on every snapshot, since (1) our added rows are flat, and thus cheap to send, and
// (2) we're relying on added rows to signal the full expanded size to the client.
GrpcUtil.safelyOnNext(listener,
streamGenerator.getSubView(subscriptionOptions, true, true, rows, false, rows, rows, columns));
// initialSnapshot flag is ignored for non-growing viewports
final boolean initialSnapshot = false;
final boolean isFullSubscription = false;
GrpcUtil.safelyOnNext(listener, streamGenerator.getSubView(
subscriptionOptions, initialSnapshot, isFullSubscription, rows, false,
prevKeyspaceViewportRows, keyspaceViewportRows, columns));
}

// 6. Let the caller know what the expanded size was
return expandedSize;
prevKeyspaceViewportRows.clear();
prevKeyspaceViewportRows.insert(keyspaceViewportRows);
}

public void setViewport(
Expand Down
Loading
Loading