@@ -1059,24 +1059,35 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
1059
1059
// Left semi and anti joins are always cardinality reducing, e.g. for a
1060
1060
// given row of input they produce zero or 1 row of output. Therefore, if
1061
1061
// there is no extra filter we can process each batch of input in one go.
1062
- auto outputBatchSize = (isLeftSemiOrAntiJoinNoFilter || emptyBuildSide)
1062
+ auto maxOutputBatchRows = (isLeftSemiOrAntiJoinNoFilter || emptyBuildSide)
1063
1063
? inputSize
1064
1064
: outputBatchSize_;
1065
- outputTableRowsCapacity_ = outputBatchSize;
1066
- if (filter_ &&
1067
- (isLeftJoin (joinType_) || isFullJoin (joinType_) ||
1068
- isAntiJoin (joinType_) || isLeftSemiFilterJoin (joinType_) ||
1069
- isLeftSemiProjectJoin (joinType_))) {
1070
- // If we need non-matching probe side row, there is a possibility that such
1071
- // row exists at end of an input batch and being carried over in the next
1072
- // output batch, so we need to make extra room of one row in output.
1073
- ++outputTableRowsCapacity_;
1065
+ outputTableRowsCapacity_ = maxOutputBatchRows;
1066
+ if (filter_) {
1067
+ if (isLeftJoin (joinType_) || isFullJoin (joinType_) ||
1068
+ isAntiJoin (joinType_) || isLeftSemiFilterJoin (joinType_) ||
1069
+ isLeftSemiProjectJoin (joinType_)) {
1070
+ // If we need non-matching probe side row, there is a possibility that
1071
+ // such row exists at end of an input batch and being carried over in the
1072
+ // next output batch, so we need to make extra room of one row in output.
1073
+ ++outputTableRowsCapacity_;
1074
+ }
1075
+
1076
+ // Initialize 'leftSemiProjectIsNull_' for a null-aware left semi join.
1077
+ if (isLeftSemiProjectJoin (joinType_) && nullAware_) {
1078
+ leftSemiProjectIsNull_.resize (outputTableRowsCapacity_);
1079
+ leftSemiProjectIsNull_.clearAll ();
1080
+ }
1074
1081
}
1082
+
1075
1083
auto mapping = initializeRowNumberMapping (
1076
1084
outputRowMapping_, outputTableRowsCapacity_, pool ());
1077
1085
auto * outputTableRows =
1078
1086
initBuffer<char *>(outputTableRows_, outputTableRowsCapacity_, pool ());
1079
1087
1088
+ int numOutputRows = 0 ;
1089
+ uint64_t maxOutputBatchBytes =
1090
+ operatorCtx_->driverCtx ()->queryConfig ().preferredOutputBatchBytes ();
1080
1091
for (;;) {
1081
1092
// If the task owning this operator has been cancelled, there is no point
1082
1093
// to continue executing this procedure, which may be long in degenerate
@@ -1085,14 +1096,14 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
1085
1096
if (operatorCtx_->task ()->isCancelled ()) {
1086
1097
return nullptr ;
1087
1098
}
1088
- int numOut = 0 ;
1099
+ int numJoinedRows = 0 ;
1089
1100
1090
1101
if (emptyBuildSide) {
1091
1102
// When build side is empty, anti and left joins return all probe side
1092
1103
// rows, including ones with null join keys.
1093
1104
std::iota (mapping.begin (), mapping.begin () + inputSize, 0 );
1094
1105
std::fill (outputTableRows, outputTableRows + inputSize, nullptr );
1095
- numOut = inputSize;
1106
+ numJoinedRows = inputSize;
1096
1107
} else if (isAntiJoin (joinType_) && !filter_) {
1097
1108
if (nullAware_) {
1098
1109
// When build side is not empty, anti join without a filter returns
@@ -1101,46 +1112,48 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
1101
1112
for (auto i = 0 ; i < inputSize; ++i) {
1102
1113
if (nonNullInputRows_.isValid (i) &&
1103
1114
(!activeRows_.isValid (i) || !lookup_->hits [i])) {
1104
- mapping[numOut] = i;
1105
- ++numOut;
1115
+ mapping[numJoinedRows++] = i;
1106
1116
}
1107
1117
}
1108
1118
} else {
1109
1119
for (auto i = 0 ; i < inputSize; ++i) {
1110
1120
if (!nonNullInputRows_.isValid (i) ||
1111
1121
(!activeRows_.isValid (i) || !lookup_->hits [i])) {
1112
- mapping[numOut] = i;
1113
- ++numOut;
1122
+ mapping[numJoinedRows++] = i;
1114
1123
}
1115
1124
}
1116
1125
}
1117
1126
} else {
1118
- numOut = table_->listJoinResults (
1127
+ numJoinedRows = table_->listJoinResults (
1119
1128
*resultIter_,
1120
1129
joinIncludesMissesFromLeft (joinType_),
1121
- folly::Range (mapping.data (), outputBatchSize ),
1122
- folly::Range (outputTableRows, outputBatchSize ),
1123
- operatorCtx_-> driverCtx ()-> queryConfig (). preferredOutputBatchBytes () );
1130
+ folly::Range (mapping.data (), maxOutputBatchRows ),
1131
+ folly::Range (outputTableRows, maxOutputBatchRows ),
1132
+ maxOutputBatchBytes );
1124
1133
}
1125
1134
1126
1135
// We are done processing the input batch if there are no more joined rows
1127
1136
// to process and the NoMatchDetector isn't carrying forward a row that
1128
1137
// still needs to be written to the output.
1129
- if (!numOut && !noMatchDetector_.hasLastMissedRow ()) {
1138
+ if (!numJoinedRows && !noMatchDetector_.hasLastMissedRow ()) {
1139
+ if (numOutputRows > 0 ) {
1140
+ fillOutput (numOutputRows);
1141
+ input_ = nullptr ;
1142
+ return output_;
1143
+ }
1130
1144
input_ = nullptr ;
1131
1145
return nullptr ;
1132
1146
}
1133
- VELOX_CHECK_LE (numOut, outputBatchSize);
1147
+ VELOX_CHECK_LE (numJoinedRows, maxOutputBatchRows);
1148
+ auto numJoinedRowsAfterFilter = evalFilter (numOutputRows, numJoinedRows);
1134
1149
1135
- numOut = evalFilter (numOut);
1136
-
1137
- if (numOut == 0 ) {
1150
+ if (numJoinedRowsAfterFilter == 0 ) {
1138
1151
continue ;
1139
1152
}
1140
1153
1141
1154
if (needLastProbe ()) {
1142
1155
// Mark build-side rows that have a match on the join condition.
1143
- table_->rows ()->setProbedFlag (outputTableRows, numOut );
1156
+ table_->rows ()->setProbedFlag (outputTableRows, numJoinedRowsAfterFilter );
1144
1157
}
1145
1158
1146
1159
// Right semi join only returns the build side output when the probe side
@@ -1152,7 +1165,35 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
1152
1165
return nullptr ;
1153
1166
}
1154
1167
1155
- fillOutput (numOut);
1168
+ if (numJoinedRowsAfterFilter < numJoinedRows || numOutputRows > 0 ) {
1169
+ numOutputRows += numJoinedRowsAfterFilter;
1170
+ // Calculates the estimated size of the output batch in bytes after
1171
+ // applying a filter. The estimation is based on the ratio of the number
1172
+ // of joined rows to the number of output rows before the filter,
1173
+ // multiplied by the size of the output batch in bytes.
1174
+ const auto estimatedOutputBatchBytes =
1175
+ (1.0 * numJoinedRowsAfterFilter / numJoinedRows) *
1176
+ resultIter_->outputBatchBytes ;
1177
+ // Continue the loop to populate 'outputRowMapping_' and
1178
+ // 'outputTableRows_' until either all input rows are processed or the
1179
+ // desired row count / max bytes is reached, avoiding low-selectivity
1180
+ // vectors.
1181
+ if (!resultIter_->atEnd () && numOutputRows < maxOutputBatchRows &&
1182
+ estimatedOutputBatchBytes < maxOutputBatchBytes) {
1183
+ mapping = folly::Range (
1184
+ outputRowMapping_->asMutable <vector_size_t >() + numOutputRows,
1185
+ outputTableRowsCapacity_ - numOutputRows);
1186
+ outputTableRows = outputTableRows_->asMutable <char *>() + numOutputRows;
1187
+ maxOutputBatchRows -= numJoinedRowsAfterFilter;
1188
+ maxOutputBatchBytes -= estimatedOutputBatchBytes;
1189
+ continue ;
1190
+ }
1191
+ }
1192
+ if (numOutputRows > 0 ) {
1193
+ numJoinedRowsAfterFilter = numOutputRows;
1194
+ }
1195
+
1196
+ fillOutput (numJoinedRowsAfterFilter);
1156
1197
1157
1198
if (isLeftSemiOrAntiJoinNoFilter || emptyBuildSide) {
1158
1199
input_ = nullptr ;
@@ -1177,7 +1218,15 @@ bool HashProbe::maybeReadSpillOutput() {
1177
1218
return true ;
1178
1219
}
1179
1220
1180
- RowVectorPtr HashProbe::createFilterInput (vector_size_t size) {
1221
+ RowVectorPtr HashProbe::createFilterInput (
1222
+ vector_size_t offset,
1223
+ vector_size_t size) {
1224
+ BufferPtr outputRowMapping = outputRowMapping_;
1225
+ if (offset > 0 ) {
1226
+ VELOX_CHECK_LE (size, outputTableRowsCapacity_ - offset);
1227
+ outputRowMapping = Buffer::slice<vector_size_t >(
1228
+ outputRowMapping_, offset, outputTableRowsCapacity_ - offset, pool ());
1229
+ }
1181
1230
std::vector<VectorPtr> filterColumns (filterInputType_->size ());
1182
1231
for (auto projection : filterInputProjections_) {
1183
1232
if (projectedInputColumns_.find (projection.inputChannel ) !=
@@ -1194,12 +1243,12 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
1194
1243
}
1195
1244
1196
1245
filterColumns[projection.outputChannel ] = wrapChild (
1197
- size, outputRowMapping_ , input_->childAt (projection.inputChannel ));
1246
+ size, outputRowMapping , input_->childAt (projection.inputChannel ));
1198
1247
}
1199
1248
1200
1249
extractColumns (
1201
1250
table_.get (),
1202
- folly::Range<char * const *>(outputTableRows_->as <char *>(), size),
1251
+ folly::Range<char * const *>(outputTableRows_->as <char *>() + offset , size),
1203
1252
filterTableProjections_,
1204
1253
pool (),
1205
1254
filterInputType_->children (),
@@ -1212,7 +1261,8 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
1212
1261
void HashProbe::prepareFilterRowsForNullAwareJoin (
1213
1262
RowVectorPtr& filterInput,
1214
1263
vector_size_t numRows,
1215
- bool filterPropagateNulls) {
1264
+ bool filterPropagateNulls,
1265
+ vector_size_t * rawOutputProbeRowMapping) {
1216
1266
VELOX_CHECK_LE (numRows, kBatchSize );
1217
1267
if (filterTableInput_ == nullptr ) {
1218
1268
filterTableInput_ =
@@ -1255,10 +1305,9 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
1255
1305
// with null join key columns(s) as we can apply filtering after they cross
1256
1306
// join with the table rows later.
1257
1307
if (!nonNullInputRows_.isAllSelected ()) {
1258
- auto * rawMapping = outputRowMapping_->asMutable <vector_size_t >();
1259
1308
for (int i = 0 ; i < numRows; ++i) {
1260
1309
if (filterInputRows_.isValid (i) &&
1261
- !nonNullInputRows_.isValid (rawMapping [i])) {
1310
+ !nonNullInputRows_.isValid (rawOutputProbeRowMapping [i])) {
1262
1311
filterInputRows_.setValid (i, false );
1263
1312
}
1264
1313
}
@@ -1345,10 +1394,8 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin(
1345
1394
1346
1395
SelectivityVector HashProbe::evalFilterForNullAwareJoin (
1347
1396
vector_size_t numRows,
1348
- bool filterPropagateNulls) {
1349
- auto * rawOutputProbeRowMapping =
1350
- outputRowMapping_->asMutable <vector_size_t >();
1351
-
1397
+ bool filterPropagateNulls,
1398
+ vector_size_t * rawOutputProbeRowMapping) {
1352
1399
// Subset of probe-side rows with a match that passed the filter.
1353
1400
SelectivityVector filterPassedRows (input_->size (), false );
1354
1401
@@ -1417,15 +1464,15 @@ void HashProbe::prepareNullKeyProbeHashers() {
1417
1464
}
1418
1465
}
1419
1466
1420
- int32_t HashProbe::evalFilter (int32_t numRows) {
1467
+ int32_t HashProbe::evalFilter (int32_t offset, int32_t numRows) {
1421
1468
if (!filter_) {
1422
1469
return numRows;
1423
1470
}
1424
1471
1425
1472
const bool filterPropagateNulls = filter_->expr (0 )->propagatesNulls ();
1426
1473
auto * rawOutputProbeRowMapping =
1427
- outputRowMapping_->asMutable <vector_size_t >();
1428
- auto * outputTableRows = outputTableRows_->asMutable <char *>();
1474
+ outputRowMapping_->asMutable <vector_size_t >() + offset ;
1475
+ auto * outputTableRows = outputTableRows_->asMutable <char *>() + offset ;
1429
1476
1430
1477
filterInputRows_.resizeFill (numRows);
1431
1478
@@ -1443,11 +1490,11 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
1443
1490
filterInputRows_.updateBounds ();
1444
1491
}
1445
1492
1446
- RowVectorPtr filterInput = createFilterInput (numRows);
1493
+ RowVectorPtr filterInput = createFilterInput (offset, numRows);
1447
1494
1448
1495
if (nullAware_) {
1449
1496
prepareFilterRowsForNullAwareJoin (
1450
- filterInput, numRows, filterPropagateNulls);
1497
+ filterInput, numRows, filterPropagateNulls, rawOutputProbeRowMapping );
1451
1498
}
1452
1499
1453
1500
EvalCtx evalCtx (operatorCtx_->execCtx (), filter_.get (), filterInput.get ());
@@ -1525,21 +1572,18 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
1525
1572
static const char * kPassed = " passed" ;
1526
1573
1527
1574
if (nullAware_) {
1528
- leftSemiProjectIsNull_.resize (numRows);
1529
- leftSemiProjectIsNull_.clearAll ();
1530
-
1531
1575
auto addLast = [&](auto row, std::optional<bool > passed) {
1532
1576
if (passed.has_value ()) {
1533
1577
outputTableRows[numPassed] =
1534
1578
passed.value () ? const_cast <char *>(kPassed ) : nullptr ;
1535
1579
} else {
1536
- leftSemiProjectIsNull_.setValid (numPassed, true );
1580
+ leftSemiProjectIsNull_.setValid (numPassed + offset , true );
1537
1581
}
1538
1582
rawOutputProbeRowMapping[numPassed++] = row;
1539
1583
};
1540
1584
1541
- auto passedRows =
1542
- evalFilterForNullAwareJoin ( numRows, filterPropagateNulls);
1585
+ auto passedRows = evalFilterForNullAwareJoin (
1586
+ numRows, filterPropagateNulls, rawOutputProbeRowMapping );
1543
1587
for (auto i = 0 ; i < numRows; ++i) {
1544
1588
// filterPassed(i) -> TRUE
1545
1589
// else passed -> NULL
@@ -1575,8 +1619,8 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
1575
1619
rawOutputProbeRowMapping[numPassed++] = row;
1576
1620
};
1577
1621
if (nullAware_) {
1578
- auto passedRows =
1579
- evalFilterForNullAwareJoin ( numRows, filterPropagateNulls);
1622
+ auto passedRows = evalFilterForNullAwareJoin (
1623
+ numRows, filterPropagateNulls, rawOutputProbeRowMapping );
1580
1624
for (auto i = 0 ; i < numRows; ++i) {
1581
1625
auto probeRow = rawOutputProbeRowMapping[i];
1582
1626
bool passed = passedRows.isValid (probeRow);
0 commit comments