Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18778: Add rebase method to TreeTable and RollupTable in support of ACLs. #6666

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,13 @@ interface NodeOperationsRecorder extends
*/
NodeOperationsRecorder translateAggregatedNodeOperationsForConstituentNodes(
@NotNull NodeOperationsRecorder aggregatedNodeOperationsToTranslate);

/**
* Copy this RollupTable to a new RollupTable, replacing the source with newSource.
*
* @param newSource a new source table that must have the same definition as the source of this rollup
*
* @return The new RollupTable
*/
RollupTable rebase(Table newSource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ interface NodeOperationsRecorder extends
*/
TreeTable withNodeOperations(@NotNull NodeOperationsRecorder nodeOperations);

/**
* Copy this TreeTable to the new table, replacing the source with newSource.
*
* @param newSource a new source table that must have the same definition as the source of this tree table
*
* @return The new TreeTable
*/
TreeTable rebase(Table newSource);

/**
* Adapt a {@code source} {@link Table} to be used for a {@link Table#tree(String, String) tree} to ensure that the
* result will have no orphaned nodes. Nodes whose parents do not exist will become children of the root node in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public class RollupTableImpl extends HierarchicalTableImpl<RollupTable, RollupTa
private final AggregationRowLookup[] levelRowLookups;
private final ColumnSource<Table>[] levelNodeTableSources;

/**
* When {@link #withFilter(Filter)} is called, we apply the filters to the lowest level, and re-aggregate. We need
* them to keep it around, so that when we rebase to a new table we can re-apply them and preserve the semantics of
* our chain of rollup operations.
*/
@Nullable
private final WhereFilter[] lowestRollupKeyFilters;

private final TableDefinition aggregatedNodeDefinition;
private final RollupNodeOperationsRecorder aggregatedNodeOperations;
private final TableDefinition constituentNodeDefinition;
Expand All @@ -99,6 +107,7 @@ private RollupTableImpl(
@Nullable final RollupNodeOperationsRecorder aggregatedNodeOperations,
@Nullable final TableDefinition constituentNodeDefinition,
@Nullable final RollupNodeOperationsRecorder constituentNodeOperations,
@Nullable final WhereFilter[] lowestRollupKeyFilters,
@Nullable final List<ColumnDefinition<?>> availableColumnDefinitions) {
super(initialAttributes, source, levelTables[0]);

Expand All @@ -111,6 +120,7 @@ private RollupTableImpl(
// Note that we don't count the "root" as a level in this accounting; the root is levelTables[0], and its
// children (depth 1, at level 0), are found in levelNodeTableSources[0].
numLevels = groupByColumns.size() + 1;
this.lowestRollupKeyFilters = lowestRollupKeyFilters;
constituentDepth = numLevels + 1;
Require.eq(numLevels, "level count", levelTables.length, "levelTables.length");
this.levelTables = levelTables;
Expand Down Expand Up @@ -254,10 +264,20 @@ public RollupTable withFilter(@NotNull final Filter filter) {
final ColumnSource<Table>[] levelNodeTableSources = makeLevelNodeTableSourcesArray(
numLevels, filteredBaseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class));
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns);

final WhereFilter[] newFilters;
if (lowestRollupKeyFilters == null) {
newFilters = whereFilters;
} else {
newFilters = Arrays.copyOf(lowestRollupKeyFilters, lowestRollupKeyFilters.length + whereFilters.length);
System.arraycopy(whereFilters, 0, newFilters, lowestRollupKeyFilters.length, whereFilters.length);
}

return new RollupTableImpl(getAttributes(), source, aggregations, includesConstituents, groupByColumns,
levelTables, levelRowLookups, levelNodeTableSources,
aggregatedNodeDefinition, aggregatedNodeOperations,
constituentNodeDefinition, constituentNodeOperations,
newFilters,
availableColumnDefinitions);
}

Expand Down Expand Up @@ -332,7 +352,9 @@ public RollupTable withNodeOperations(@NotNull final NodeOperationsRecorder... n
}
return new RollupTableImpl(getAttributes(), source, aggregations, includesConstituents, groupByColumns,
levelTables, levelRowLookups, levelNodeTableSources,
null, newAggregatedNodeOperations, null, newConstituentNodeOperations, newAvailableColumnDefinitions);
null, newAggregatedNodeOperations, null, newConstituentNodeOperations,
lowestRollupKeyFilters,
newAvailableColumnDefinitions);
}

@Override
Expand Down Expand Up @@ -451,6 +473,51 @@ protected RollupTableImpl copy() {
levelTables, levelRowLookups, levelNodeTableSources,
aggregatedNodeDefinition, aggregatedNodeOperations,
constituentNodeDefinition, constituentNodeOperations,
lowestRollupKeyFilters,
availableColumnDefinitions);
}

@Override
public RollupTableImpl rebase(final Table newSource) {
if (!newSource.getDefinition().equals(source.getDefinition())) {
if (newSource.getDefinition().equalsIgnoreOrder(source.getDefinition())) {
throw new IllegalArgumentException(
"Cannot rebase a RollupTable with a new source definition, column order is not identical.");
}
final String differenceDescription = newSource.getDefinition()
.getDifferenceDescription(source.getDefinition(), "new source", "existing source", ",");
throw new IllegalArgumentException(
"Cannot rebase a RollupTable with a new source definition: " + differenceDescription);
}

final QueryTable newSourceQueryTable;
if (lowestRollupKeyFilters != null) {
// if this rollup has had any key filters, then we apply them to the new source rather than generating
// buckets for keys which cannot be present in the result. Because these filters are applied, we pass
// in a null set of filters to the new RollupTableImpl we are creating.
final List<WhereFilter> keyFilter =
Arrays.stream(lowestRollupKeyFilters).map(WhereFilter::copy).collect(Collectors.toList());
newSourceQueryTable = (QueryTable) newSource.where(Filter.and(keyFilter));
} else {
newSourceQueryTable = (QueryTable) newSource;
}

final int numLevels = groupByColumns.size() + 1;
final QueryTable baseLevel = newSourceQueryTable.aggNoMemo(
AggregationProcessor.forRollupBase(aggregations, includesConstituents, ROLLUP_COLUMN),
false, null, groupByColumns);
final QueryTable[] levelTables = makeLevelTablesArray(numLevels, baseLevel);
final AggregationRowLookup[] levelRowLookups = makeLevelRowLookupsArray(numLevels, getRowLookup(baseLevel));
final ColumnSource<Table>[] levelNodeTableSources = makeLevelNodeTableSourcesArray(
numLevels, baseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class));
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns);

return new RollupTableImpl(getAttributes(), newSourceQueryTable, aggregations, includesConstituents,
groupByColumns,
levelTables, levelRowLookups, levelNodeTableSources,
aggregatedNodeDefinition, aggregatedNodeOperations,
constituentNodeDefinition, constituentNodeOperations,
null,
availableColumnDefinitions);
}

Expand All @@ -471,7 +538,7 @@ public static RollupTable makeRollup(
final RollupTableImpl result = new RollupTableImpl(
source.getAttributes(ak -> shouldCopyAttribute(ak, CopyAttributeOperation.Rollup)),
source, aggregations, includeConstituents, groupByColumns,
levelTables, levelRowLookups, levelNodeTableSources, null, null, null, null, null);
levelTables, levelRowLookups, levelNodeTableSources, null, null, null, null, null, null);
source.copySortableColumns(result, baseLevel.getDefinition().getColumnNameSet()::contains);
result.setColumnDescriptions(AggregationDescriptions.of(aggregations));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,28 @@ protected TreeTableImpl copy() {
parentIdentifierColumn, nodeFilterColumns, nodeOperations, availableColumnDefinitions);
}

@Override
public TreeTable rebase(@NotNull final Table newSource) {
if (!newSource.getDefinition().equals(source.getDefinition())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make a case that we just need to have the parent and id columns present on newSource. I guess we don't need to do that for the ACL case.

if (newSource.getDefinition().equalsIgnoreOrder(source.getDefinition())) {
throw new IllegalArgumentException(
"Cannot rebase a TreeTable with a new source definition, column order is not identical.");
}
final String differenceDescription = newSource.getDefinition()
.getDifferenceDescription(source.getDefinition(), "new source", "existing source", ",");
throw new IllegalArgumentException(
"Cannot rebase a TreeTable with a new source definition: " + differenceDescription);
}

final QueryTable newSourceQueryTable = (QueryTable) newSource;
final QueryTable tree = computeTree(newSourceQueryTable, parentIdentifierColumn);
final QueryTable sourceRowLookupTable = computeSourceRowLookupTable(newSourceQueryTable, identifierColumn);
final TreeSourceRowLookup sourceRowLookup = new TreeSourceRowLookup(newSource, sourceRowLookupTable);

return new TreeTableImpl(getAttributes(), newSourceQueryTable, tree, sourceRowLookup, identifierColumn,
parentIdentifierColumn, nodeFilterColumns, nodeOperations, availableColumnDefinitions);
}

public static TreeTable makeTree(
@NotNull final QueryTable source,
@NotNull final ColumnName identifierColumn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,29 @@
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.csv.CsvTools;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
import io.deephaven.engine.table.hierarchical.HierarchicalTable.SnapshotState;
import io.deephaven.engine.table.hierarchical.RollupTable;
import io.deephaven.engine.table.hierarchical.TreeTable;
import io.deephaven.engine.table.impl.sources.ByteAsBooleanColumnSource;
import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource;
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.TableTools;
import io.deephaven.test.types.OutOfBandTest;

import junit.framework.TestCase;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,9 +39,8 @@
import java.util.concurrent.TimeoutException;

import static io.deephaven.api.agg.Aggregation.AggMax;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.byteToBooleanSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.longToInstantSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.maybeConvertToPrimitiveChunkType;
import static io.deephaven.engine.testutil.HierarchicalTableTestTools.freeSnapshotTableChunks;
import static io.deephaven.engine.testutil.HierarchicalTableTestTools.snapshotToTable;
import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.util.QueryConstants.NULL_INT;
Expand Down Expand Up @@ -259,75 +248,4 @@ public void testSortedExpandAll() throws CsvReaderException {
freeSnapshotTableChunks(snapshot);
freeSnapshotTableChunks(snapshotSort);
}

@SuppressWarnings("SameParameterValue")
private static Table snapshotToTable(
@NotNull final HierarchicalTable<?> hierarchicalTable,
@NotNull final SnapshotState snapshotState,
@NotNull final Table keyTable,
@Nullable final ColumnName keyTableActionColumn,
@Nullable final BitSet columns,
@NotNull final RowSequence rows) {
final ColumnDefinition<?>[] availableColumns =
hierarchicalTable.getAvailableColumnDefinitions().toArray(ColumnDefinition[]::new);
final ColumnDefinition<?>[] includedColumns = columns == null
? availableColumns
: columns.stream().mapToObj(ci -> availableColumns[ci]).toArray(ColumnDefinition[]::new);

assertThat(rows.isContiguous()).isTrue();
final int rowsSize = rows.intSize();
// noinspection rawtypes
final WritableChunk[] chunks = Arrays.stream(includedColumns)
.map(cd -> maybeConvertToPrimitiveChunkType(cd.getDataType()))
.map(ct -> ct.makeWritableChunk(rowsSize))
.toArray(WritableChunk[]::new);

// noinspection unchecked
final long expandedSize =
hierarchicalTable.snapshot(snapshotState, keyTable, keyTableActionColumn, columns, rows, chunks);
final int snapshotSize = chunks.length == 0 ? 0 : chunks[0].size();
final long expectedSnapshotSize = rows.isEmpty()
? 0
: Math.min(rows.lastRowKey() + 1, expandedSize) - rows.firstRowKey();
assertThat(snapshotSize).isEqualTo(expectedSnapshotSize);

final LinkedHashMap<String, ColumnSource<?>> sources = new LinkedHashMap<>(includedColumns.length);
for (int ci = 0; ci < includedColumns.length; ++ci) {
final ColumnDefinition<?> columnDefinition = includedColumns[ci];
// noinspection unchecked
final WritableChunk<? extends Values> chunk = chunks[ci];
final ChunkColumnSource<?> chunkColumnSource = ChunkColumnSource.make(
chunk.getChunkType(), columnDefinition.getDataType(), columnDefinition.getComponentType());
chunkColumnSource.addChunk(chunk);
final ColumnSource<?> source;
if (columnDefinition.getDataType() == Boolean.class && chunkColumnSource.getType() == byte.class) {
// noinspection unchecked
source = byteToBooleanSource((ColumnSource<Byte>) chunkColumnSource);
} else if (columnDefinition.getDataType() == Instant.class && chunkColumnSource.getType() == long.class) {
// noinspection unchecked
source = longToInstantSource((ColumnSource<Long>) chunkColumnSource);
} else {
source = chunkColumnSource;
}
sources.put(columnDefinition.getName(), source);
}

// noinspection resource
return new QueryTable(
TableDefinition.of(includedColumns),
RowSetFactory.flat(snapshotSize).toTracking(),
sources);
}

private static void freeSnapshotTableChunks(@NotNull final Table snapshotTable) {
snapshotTable.getColumnSources().forEach(cs -> {
if (cs instanceof ByteAsBooleanColumnSource) {
((ChunkColumnSource<?>) cs.reinterpret(byte.class)).clear();
} else if (cs instanceof LongAsInstantColumnSource) {
((ChunkColumnSource<?>) cs.reinterpret(long.class)).clear();
} else {
((ChunkColumnSource<?>) cs).clear();
}
});
}
}
Loading