Skip to content

Commit b08a3ab

Browse files
committed
Merge branch 'master' into limit_pushdown
2 parents 35282ee + 4da7c3d commit b08a3ab

File tree

1,402 files changed

+54120
-26450
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,402 files changed

+54120
-26450
lines changed

.github/PULL_REQUEST_TEMPLATE

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Thanks for sending a pull request! Here are some tips for you:
99
7. If you want to add a new configuration, please read the guideline first for naming configurations in
1010
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
1111
8. If you want to add or modify an error type or message, please read the guideline first in
12-
'core/src/main/resources/error/README.md'.
12+
'common/utils/src/main/resources/error/README.md'.
1313
-->
1414

1515
### What changes were proposed in this pull request?

.github/workflows/benchmark.yml

+6-6
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ jobs:
7070
with:
7171
fetch-depth: 0
7272
- name: Cache Scala, SBT and Maven
73-
uses: actions/cache@v3
73+
uses: actions/cache@v4
7474
with:
7575
path: |
7676
build/apache-maven-*
@@ -81,15 +81,15 @@ jobs:
8181
restore-keys: |
8282
build-
8383
- name: Cache Coursier local repository
84-
uses: actions/cache@v3
84+
uses: actions/cache@v4
8585
with:
8686
path: ~/.cache/coursier
8787
key: benchmark-coursier-${{ github.event.inputs.jdk }}-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
8888
restore-keys: |
8989
benchmark-coursier-${{ github.event.inputs.jdk }}
9090
- name: Cache TPC-DS generated data
9191
id: cache-tpcds-sf-1
92-
uses: actions/cache@v3
92+
uses: actions/cache@v4
9393
with:
9494
path: ./tpcds-sf-1
9595
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
@@ -139,7 +139,7 @@ jobs:
139139
with:
140140
fetch-depth: 0
141141
- name: Cache Scala, SBT and Maven
142-
uses: actions/cache@v3
142+
uses: actions/cache@v4
143143
with:
144144
path: |
145145
build/apache-maven-*
@@ -150,7 +150,7 @@ jobs:
150150
restore-keys: |
151151
build-
152152
- name: Cache Coursier local repository
153-
uses: actions/cache@v3
153+
uses: actions/cache@v4
154154
with:
155155
path: ~/.cache/coursier
156156
key: benchmark-coursier-${{ github.event.inputs.jdk }}-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
@@ -164,7 +164,7 @@ jobs:
164164
- name: Cache TPC-DS generated data
165165
if: contains(github.event.inputs.class, 'TPCDSQueryBenchmark') || contains(github.event.inputs.class, '*')
166166
id: cache-tpcds-sf-1
167-
uses: actions/cache@v3
167+
uses: actions/cache@v4
168168
with:
169169
path: ./tpcds-sf-1
170170
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}

.github/workflows/build_and_test.yml

+55-43
Large diffs are not rendered by default.

.github/workflows/build_python.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ on:
2626
jobs:
2727
run-build:
2828
strategy:
29+
fail-fast: false
2930
matrix:
30-
pyversion: ["pypy3,python3.10", "python3.11,python3.12"]
31+
pyversion: ["pypy3", "python3.10", "python3.11", "python3.12"]
3132
permissions:
3233
packages: write
3334
name: Run

.github/workflows/maven_test.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ jobs:
132132
git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty
133133
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
134134
- name: Cache Scala, SBT and Maven
135-
uses: actions/cache@v3
135+
uses: actions/cache@v4
136136
with:
137137
path: |
138138
build/apache-maven-*
@@ -143,7 +143,7 @@ jobs:
143143
restore-keys: |
144144
build-
145145
- name: Cache Maven local repository
146-
uses: actions/cache@v3
146+
uses: actions/cache@v4
147147
with:
148148
path: ~/.m2/repository
149149
key: java${{ matrix.java }}-maven-${{ hashFiles('**/pom.xml') }}

.github/workflows/notify_test_workflow.yml

+19-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
github-token: ${{ secrets.GITHUB_TOKEN }}
4242
script: |
4343
const endpoint = 'GET /repos/:owner/:repo/actions/workflows/:id/runs?&branch=:branch'
44-
const check_run_endpoint = 'GET /repos/:owner/:repo/commits/:ref/check-runs'
44+
const check_run_endpoint = 'GET /repos/:owner/:repo/commits/:ref/check-runs?per_page=100'
4545
4646
// TODO: Should use pull_request.user and pull_request.user.repos_url?
4747
// If a different person creates a commit to another forked repo,
@@ -115,8 +115,22 @@ jobs:
115115
}
116116
117117
// Here we get check run ID to provide Check run view instead of Actions view, see also SPARK-37879.
118-
const check_runs = await github.request(check_run_endpoint, check_run_params)
119-
const check_run_head = check_runs.data.check_runs.filter(r => r.name === "Run / Check changes")[0]
118+
let retryCount = 0;
119+
let check_run_head;
120+
while (retryCount < 3) {
121+
const check_runs = await github.request(check_run_endpoint, check_run_params);
122+
check_run_head = check_runs.data.check_runs.find(r => r.name === "Run / Check changes");
123+
if (check_run_head) {
124+
break;
125+
}
126+
retryCount++;
127+
if (retryCount < 3) {
128+
await new Promise(resolve => setTimeout(resolve, 3000));
129+
}
130+
}
131+
if (!check_run_head) {
132+
throw new Error('Failed to retrieve check_run_head after 3 attempts');
133+
}
120134
121135
if (check_run_head.head_sha != context.payload.pull_request.head.sha) {
122136
throw new Error('There was a new unsynced commit pushed. Please retrigger the workflow.');
@@ -126,11 +140,13 @@ jobs:
126140
+ context.payload.pull_request.head.repo.full_name
127141
+ '/runs/'
128142
+ check_run_head.id
143+
console.log('Check run URL: ' + check_run_url)
129144
130145
const actions_url = 'https://github.com/'
131146
+ context.payload.pull_request.head.repo.full_name
132147
+ '/actions/runs/'
133148
+ run_id
149+
console.log('Actions URL: ' + actions_url)
134150
135151
github.rest.checks.create({
136152
owner: context.repo.owner,

.github/workflows/publish_snapshot.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
with:
4646
ref: ${{ matrix.branch }}
4747
- name: Cache Maven local repository
48-
uses: actions/cache@v3
48+
uses: actions/cache@v4
4949
with:
5050
path: ~/.m2/repository
5151
key: snapshot-maven-${{ hashFiles('**/pom.xml') }}

R/pkg/R/functions.R

+28
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,34 @@ setMethod("dayofyear",
10911091
column(jc)
10921092
})
10931093

1094+
#' @details
1095+
#' \code{monthname}: Extracts the three-letter abbreviated month name from a
1096+
#' given date/timestamp/string.
1097+
#'
1098+
#' @rdname column_datetime_functions
1099+
#' @aliases monthname monthname,Column-method
1100+
#' @note monthname since 4.0.0
1101+
setMethod("monthname",
1102+
signature(x = "Column"),
1103+
function(x) {
1104+
jc <- callJStatic("org.apache.spark.sql.functions", "monthname", x@jc)
1105+
column(jc)
1106+
})
1107+
1108+
#' @details
1109+
#' \code{dayname}: Extracts the three-letter abbreviated day name from a
1110+
#' given date/timestamp/string.
1111+
#'
1112+
#' @rdname column_datetime_functions
1113+
#' @aliases dayname dayname,Column-method
1114+
#' @note dayname since 4.0.0
1115+
setMethod("dayname",
1116+
signature(x = "Column"),
1117+
function(x) {
1118+
jc <- callJStatic("org.apache.spark.sql.functions", "dayname", x@jc)
1119+
column(jc)
1120+
})
1121+
10941122
#' @details
10951123
#' \code{decode}: Computes the first argument into a string from a binary using the provided
10961124
#' character set.

R/pkg/R/generics.R

+8
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,14 @@ setGeneric("dayofweek", function(x) { standardGeneric("dayofweek") })
10201020
#' @name NULL
10211021
setGeneric("dayofyear", function(x) { standardGeneric("dayofyear") })
10221022

1023+
#' @rdname column_datetime_functions
1024+
#' @name NULL
1025+
setGeneric("monthname", function(x) { standardGeneric("monthname") })
1026+
1027+
#' @rdname column_datetime_functions
1028+
#' @name NULL
1029+
setGeneric("dayname", function(x) { standardGeneric("dayname") })
1030+
10231031
#' @rdname column_string_functions
10241032
#' @name NULL
10251033
setGeneric("decode", function(x, charset) { standardGeneric("decode") })

R/pkg/tests/fulltests/test_sparkSQL.R

+3-1
Original file line numberDiff line numberDiff line change
@@ -1414,7 +1414,7 @@ test_that("test HiveContext", {
14141414

14151415
# Invalid mode
14161416
expect_error(saveAsTable(df, "parquetest", "parquet", mode = "abc", path = parquetDataPath),
1417-
"illegal argument - Unknown save mode: abc")
1417+
"Error in mode : analysis error - \\[INVALID_SAVE_MODE\\].*")
14181418
unsetHiveContext()
14191419
}
14201420
})
@@ -2062,6 +2062,8 @@ test_that("date functions on a DataFrame", {
20622062
expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51))
20632063
expect_equal(collect(select(df, year(df$b)))[, 1], c(2012, 2013, 2014))
20642064
expect_equal(collect(select(df, month(df$b)))[, 1], c(12, 12, 12))
2065+
expect_equal(collect(select(df, monthname(df$b)))[, 1], c("Dec", "Dec", "Dec"))
2066+
expect_equal(collect(select(df, dayname(df$b)))[, 1], c("Thu", "Sat", "Mon"))
20652067
expect_equal(collect(select(df, last_day(df$b)))[, 1],
20662068
c(as.Date("2012-12-31"), as.Date("2013-12-31"), as.Date("2014-12-31")))
20672069
expect_equal(collect(select(df, next_day(df$b, "MONDAY")))[, 1],

assembly/pom.xml

+16
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@
149149
<groupId>org.apache.spark</groupId>
150150
<artifactId>spark-connect_${scala.binary.version}</artifactId>
151151
<version>${project.version}</version>
152+
<exclusions>
153+
<exclusion>
154+
<groupId>org.apache.spark</groupId>
155+
<artifactId>spark-connect-common_${scala.binary.version}</artifactId>
156+
</exclusion>
157+
</exclusions>
152158
</dependency>
153159
<dependency>
154160
<groupId>org.apache.spark</groupId>
@@ -204,6 +210,16 @@
204210
</dependency>
205211
</dependencies>
206212
</profile>
213+
<profile>
214+
<id>jvm-profiler</id>
215+
<dependencies>
216+
<dependency>
217+
<groupId>org.apache.spark</groupId>
218+
<artifactId>spark-profiler_${scala.binary.version}</artifactId>
219+
<version>${project.version}</version>
220+
</dependency>
221+
</dependencies>
222+
</profile>
207223
<profile>
208224
<id>bigtop-dist</id>
209225
<!-- This profile uses the assembly plugin to create a special "dist" package for BigTop

bin/pyspark

+27
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,33 @@ export PYSPARK_PYTHON
4848
export PYSPARK_DRIVER_PYTHON
4949
export PYSPARK_DRIVER_PYTHON_OPTS
5050

51+
# Attempt to find JAVA_HOME.
52+
# If JAVA_HOME not set, install JDK 17 and set JAVA_HOME using a temp dir, and adding the
53+
# temp dir to the PYTHONPATH.
54+
if [ -n "${JAVA_HOME}" ]; then
55+
RUNNER="${JAVA_HOME}/bin/java"
56+
else
57+
if [ "$(command -v java)" ]; then
58+
RUNNER="java"
59+
else
60+
echo -n "JAVA_HOME is not set. Would you like to install JDK 17 and set JAVA_HOME? (Y/N) " >&2
61+
62+
read -r input
63+
64+
if [[ "${input,,}" == "y" ]]; then
65+
TEMP_DIR=$(mktemp -d)
66+
$PYSPARK_DRIVER_PYTHON -m pip install --target="$TEMP_DIR" install-jdk
67+
export JAVA_HOME=$(PYTHONPATH="$TEMP_DIR" $PYSPARK_DRIVER_PYTHON -c 'import jdk; print(jdk.install("17"))')
68+
RUNNER="${JAVA_HOME}/bin/java"
69+
echo "JDK was installed to the path \"$JAVA_HOME\""
70+
echo "You can avoid needing to re-install JDK by setting your JAVA_HOME environment variable to \"$JAVA_HOME\""
71+
else
72+
echo "JDK installation skipped. You can manually install JDK (17 or later) and set JAVA_HOME in your environment."
73+
exit 1
74+
fi
75+
fi
76+
fi
77+
5178
# Add the PySpark classes to the Python path:
5279
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
5380
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH"

common/network-common/src/main/java/org/apache/spark/network/shuffledb/StoreVersion.java

+5
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public int hashCode() {
5454
result = 31 * result + minor;
5555
return result;
5656
}
57+
58+
@Override
59+
public String toString() {
60+
return "StoreVersion[" + major + "." + minor + ']';
61+
}
5762
}

common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public final class ReloadingX509TrustManager
6161
protected volatile int needsReloadCheckCounts;
6262
private final AtomicReference<X509TrustManager> trustManagerRef;
6363

64-
private volatile boolean running;
6564
private Thread reloader;
6665

6766
/**
@@ -98,15 +97,13 @@ public ReloadingX509TrustManager(
9897
public void init() {
9998
reloader = new Thread(this, "Truststore reloader thread");
10099
reloader.setDaemon(true);
101-
running = true;
102100
reloader.start();
103101
}
104102

105103
/**
106104
* Stops the reloader thread.
107105
*/
108106
public void destroy() throws InterruptedException {
109-
running = false;
110107
reloader.interrupt();
111108
reloader.join();
112109
}
@@ -200,11 +197,12 @@ X509TrustManager loadTrustManager()
200197

201198
@Override
202199
public void run() {
200+
boolean running = true;
203201
while (running) {
204202
try {
205203
Thread.sleep(reloadInterval);
206204
} catch (InterruptedException e) {
207-
//NOP
205+
running = false;
208206
}
209207
try {
210208
if (running && needsReload()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.apache.commons.lang3.SystemUtils;
22+
import org.apache.spark.network.shuffledb.DBBackend;
23+
import org.apache.spark.network.shuffledb.StoreVersion;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.io.File;
28+
import java.io.IOException;
29+
30+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
31+
32+
public class DBProviderSuite {
33+
34+
@Test
35+
public void testRockDBCheckVersionFailed() throws IOException {
36+
testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb");
37+
}
38+
39+
@Test
40+
public void testLevelDBCheckVersionFailed() throws IOException {
41+
assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64"));
42+
testCheckVersionFailed(DBBackend.LEVELDB, "leveldb");
43+
}
44+
45+
private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix) throws IOException {
46+
String root = System.getProperty("java.io.tmpdir");
47+
File dbFile = JavaUtils.createDirectory(root, namePrefix);
48+
try {
49+
StoreVersion v1 = new StoreVersion(1, 0);
50+
ObjectMapper mapper = new ObjectMapper();
51+
DBProvider.initDB(dbBackend, dbFile, v1, mapper).close();
52+
StoreVersion v2 = new StoreVersion(2, 0);
53+
IOException ioe = Assertions.assertThrows(IOException.class, () ->
54+
DBProvider.initDB(dbBackend, dbFile, v2, mapper));
55+
Assertions.assertTrue(
56+
ioe.getMessage().contains("incompatible with current version StoreVersion[2.0]"));
57+
} finally {
58+
JavaUtils.deleteRecursively(dbFile);
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)