Skip to content

Commit ed2dc91

Browse files
committedMay 23, 2024
[HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (apache#11270)

File tree

2 files changed

+89
-3
lines changed

2 files changed

+89
-3
lines changed
 

‎hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
2222
import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
2323
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
2424
import org.apache.hudi.config.HoodieWriteConfig
25-
import org.apache.hudi.storage.StorageConfiguration
25+
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
2626

2727
import com.esotericsoftware.kryo.io.{Input, Output}
2828
import com.esotericsoftware.kryo.serializers.JavaSerializer
@@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist
6464
// Hadoop's configuration is not a serializable object by itself, and hence
6565
// we're relying on [[SerializableConfiguration]] wrapper to work it around.
6666
// We cannot remove this entry; otherwise the ordering is changed.
67-
// So we replace it with [[StorageConfiguration]].
68-
kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
67+
// So we replace it with [[HadoopStorageConfiguration]] for Spark.
68+
kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
6969
}
7070

7171
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark;
21+
22+
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
23+
24+
import com.esotericsoftware.kryo.Kryo;
25+
import com.esotericsoftware.kryo.io.Input;
26+
import com.esotericsoftware.kryo.io.Output;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.junit.jupiter.api.Test;
29+
import org.objenesis.strategy.StdInstantiatorStrategy;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.io.ByteArrayOutputStream;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
38+
/**
39+
* Tests {@link HoodieSparkKryoRegistrar}
40+
*/
41+
public class TestHoodieSparkKryoRegistrar {
42+
@Test
43+
public void testSerdeHoodieHadoopConfiguration() {
44+
Kryo kryo = newKryo();
45+
46+
HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration());
47+
48+
// Serialize
49+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
50+
Output output = new Output(baos);
51+
kryo.writeObject(output, conf);
52+
output.close();
53+
54+
// Deserialize
55+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
56+
Input input = new Input(bais);
57+
HadoopStorageConfiguration deserialized = kryo.readObject(input, HadoopStorageConfiguration.class);
58+
input.close();
59+
60+
// Verify
61+
assertEquals(getPropsInMap(conf), getPropsInMap(deserialized));
62+
}
63+
64+
private Kryo newKryo() {
65+
Kryo kryo = new Kryo();
66+
67+
// This instance of Kryo should not require prior registration of classes
68+
kryo.setRegistrationRequired(false);
69+
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
70+
// Handle cases where we may have an odd classloader setup like with libjars
71+
// for hadoop
72+
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
73+
74+
// Register Hudi's classes
75+
new HoodieSparkKryoRegistrar().registerClasses(kryo);
76+
77+
return kryo;
78+
}
79+
80+
private Map<String, String> getPropsInMap(HadoopStorageConfiguration conf) {
81+
Map<String, String> configMap = new HashMap<>();
82+
conf.unwrap().iterator().forEachRemaining(
83+
e -> configMap.put(e.getKey(), e.getValue()));
84+
return configMap;
85+
}
86+
}

0 commit comments

Comments
 (0)
Please sign in to comment.