From e073dea175c7c36b23a3e36db938d71a8b78dd1a Mon Sep 17 00:00:00 2001 From: "alan578.zhao" <956322745@qq.com> Date: Wed, 19 Feb 2025 14:55:50 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4sofa-bo?= =?UTF-8?q?lt=E7=9B=B8=E5=85=B3filter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/hugegraph/pd/raft/KVOperation.java | 3 + .../apache/hugegraph/pd/raft/RaftEngine.java | 33 +++- .../hugegraph/pd/raft/auth/IpAuthHandler.java | 87 ++++++++++ .../HugegraphHessianSerializerFactory.java | 160 ++++++++++++++++++ 4 files changed, 280 insertions(+), 3 deletions(-) create mode 100644 hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java create mode 100644 hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java index b27252fa10..6afc6d6e94 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java @@ -29,6 +29,8 @@ import lombok.Data; +import org.apache.hugegraph.pd.raft.serializer.HugegraphHessianSerializerFactory; + @Data public class KVOperation { @@ -84,6 +86,7 @@ public static KVOperation fromByteArray(byte[] value) throws IOException { try (ByteArrayInputStream bis = new ByteArrayInputStream(value, 1, value.length - 1)) { Hessian2Input input = new Hessian2Input(bis); + input.setSerializerFactory(HugegraphHessianSerializerFactory.getInstance()); KVOperation op = new KVOperation(); op.op = value[0]; op.key = input.readBytes(); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 67734d1456..3b5fd9f575 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -19,6 +19,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -26,6 +27,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import com.alipay.remoting.ExtendedNettyChannelHandler; +import com.alipay.remoting.config.BoltServerOption; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; +import io.netty.channel.ChannelHandler; import org.apache.hugegraph.pd.common.PDException; import org.apache.hugegraph.pd.config.PDConfig; @@ -50,8 +56,8 @@ import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.internal.ThrowUtil; - import lombok.extern.slf4j.Slf4j; +import org.apache.hugegraph.pd.raft.auth.IpAuthHandler; @Slf4j public class RaftEngine { @@ -117,7 +123,7 @@ public boolean init(PDConfig.Raft config) { final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); - rpcServer = createRaftRpcServer(config.getAddress()); + rpcServer = createRaftRpcServer(config.getAddress(), initConf.getPeers()); // construct raft group and start raft this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer, true); @@ -130,14 +136,35 @@ public boolean init(PDConfig.Raft config) { /** * Create a Raft RPC Server for communication between PDs */ - private RpcServer createRaftRpcServer(String raftAddr) { + private RpcServer createRaftRpcServer(String raftAddr, List peers) { Endpoint endpoint = JRaftUtils.getEndPoint(raftAddr); RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(endpoint); + configureRaftServerIpWhitelist(peers, rpcServer); RaftRpcProcessor.registerProcessor(rpcServer, this); rpcServer.init(null); return rpcServer; } + private static void configureRaftServerIpWhitelist(List peers, RpcServer rpcServer) { + if(rpcServer instanceof BoltRpcServer){ + ((BoltRpcServer) rpcServer).getServer().option(BoltServerOption.EXTENDED_NETTY_CHANNEL_HANDLER, + new ExtendedNettyChannelHandler() { + @Override + public List frontChannelHandlers() { + return Collections.singletonList( + IpAuthHandler.getInstance( + peers.stream() + .map(PeerId::getIp) + .collect(Collectors.toSet()))); + } + @Override + public List backChannelHandlers() { + return Collections.emptyList(); + } + }); + } + } + public void shutDown() { if (this.raftGroupService != null) { this.raftGroupService.shutdown(); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java new file mode 100644 index 0000000000..9e3e5770c0 --- /dev/null +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.raft.auth; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Set; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@ChannelHandler.Sharable +public class IpAuthHandler extends ChannelDuplexHandler { + + private final Set allowedIps; + private static volatile IpAuthHandler instance; + + private IpAuthHandler(Set allowedIps) { + this.allowedIps = Collections.unmodifiableSet(allowedIps); + } + + public static IpAuthHandler getInstance(Set allowedIps) { + if (instance == null) { + synchronized (IpAuthHandler.class) { + if (instance == null) { + instance = new IpAuthHandler(allowedIps); + } + } + } + return instance; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + String clientIp = getClientIp(ctx); + if (!isIpAllowed(clientIp)) { + log.warn("Blocked connection from {}", clientIp); + ctx.close(); + return; + } + super.channelActive(ctx); + } + + private static String getClientIp(ChannelHandlerContext ctx) { + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + return remoteAddress.getAddress().getHostAddress(); + } + + private boolean isIpAllowed(String ip) { + if (allowedIps.isEmpty() || allowedIps.contains(ip)) { + return true; + } + return false; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + String clientIp=getClientIp(ctx); + log.warn("clien : {} connection exception : {}",clientIp,cause); + if (ctx.channel().isActive()) { + ctx.close().addListener(future -> { + if (!future.isSuccess()) { + log.warn("clien : {} connection closed failed : {}",clientIp,future.cause().getMessage()); + } + }); + } + } + +} diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java new file mode 100644 index 0000000000..df8ef1d99a --- /dev/null +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hugegraph.pd.raft.serializer; + +import com.caucho.hessian.io.Deserializer; +import com.caucho.hessian.io.HessianProtocolException; +import com.caucho.hessian.io.Serializer; +import com.caucho.hessian.io.SerializerFactory; + + +import lombok.extern.slf4j.Slf4j; + +import java.text.SimpleDateFormat; +import java.time.format.DateTimeFormatter; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +public class HugegraphHessianSerializerFactory extends SerializerFactory { + + private static final HugegraphHessianSerializerFactory INSTANCE = new HugegraphHessianSerializerFactory(); + + private HugegraphHessianSerializerFactory() { + super(); + initWhitelist(); + } + + public static HugegraphHessianSerializerFactory getInstance() { + return INSTANCE; + } + + private final Set whitelist = new HashSet<>(); + + private void initWhitelist() { + allowBasicType(); + allowCollections(); + allowConcurrent(); + allowTime(); + allowBusinessClasses(); + } + + private void allowBasicType() { + addToWhitelist( + boolean.class, byte.class, char.class, double.class, + float.class, int.class, long.class, short.class, + Boolean.class, Byte.class, Character.class, Double.class, + Float.class, Integer.class, Long.class, Short.class, + String.class, Class.class, Number.class + ); + } + + private void allowCollections() { + addToWhitelist( + List.class, ArrayList.class, LinkedList.class, + Set.class, HashSet.class, LinkedHashSet.class, TreeSet.class, + Map.class, HashMap.class, LinkedHashMap.class, TreeMap.class + ); + } + + private void allowConcurrent() { + addToWhitelist( + AtomicBoolean.class, AtomicInteger.class, AtomicLong.class, AtomicReference.class, + ConcurrentMap.class, ConcurrentHashMap.class, ConcurrentSkipListMap.class, CopyOnWriteArrayList.class + ); + } + + private void allowTime() { + addToWhitelist( + Date.class, Calendar.class, TimeUnit.class, + SimpleDateFormat.class, DateTimeFormatter.class + ); + tryAddClass("java.time.LocalDate"); + tryAddClass("java.time.LocalDateTime"); + tryAddClass("java.time.Instant"); + } + + private void allowBusinessClasses() { + addToWhitelist( + org.apache.hugegraph.pd.raft.KVOperation.class, + byte[].class + ); + } + + private void addToWhitelist(Class... classes) { + for (Class clazz : classes) { + whitelist.add(clazz.getName()); + } + } + + private void addInnerClasses(String... classNames) { + Collections.addAll(whitelist, classNames); + } + + private void tryAddClass(String className) { + try { + Class.forName(className); + whitelist.add(className); + } catch (ClassNotFoundException e) { + } + } + + @Override + public Serializer getSerializer(Class cl) throws HessianProtocolException { + checkWhitelist(cl); + return super.getSerializer(cl); + } + + @Override + public Deserializer getDeserializer(Class cl) throws HessianProtocolException { + checkWhitelist(cl); + return super.getDeserializer(cl); + } + + private void checkWhitelist(Class cl) { + String className = cl.getName(); + if (!whitelist.contains(className)) { + log.warn("Security alert: Blocked unauthorized class [{}] at {}", + className, new Date()); + throw new SecurityException("hessian serialize unauthorized class: " + className); + } + } +} From 4b22861064e221af66915c25b2f9d7ea3097fec9 Mon Sep 17 00:00:00 2001 From: "alan578.zhao" <956322745@qq.com> Date: Wed, 19 Feb 2025 14:57:49 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4sofa-bo?= =?UTF-8?q?lt=E7=9B=B8=E5=85=B3filter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java | 5 +---- .../raft/serializer/HugegraphHessianSerializerFactory.java | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java index 9e3e5770c0..b9b5d839ac 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java @@ -65,10 +65,7 @@ private static String getClientIp(ChannelHandlerContext ctx) { } private boolean isIpAllowed(String ip) { - if (allowedIps.isEmpty() || allowedIps.contains(ip)) { - return true; - } - return false; + return allowedIps.isEmpty() || allowedIps.contains(ip); } @Override diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java index df8ef1d99a..b7fe3884fd 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java @@ -125,15 +125,12 @@ private void addToWhitelist(Class... classes) { } } - private void addInnerClasses(String... classNames) { - Collections.addAll(whitelist, classNames); - } - private void tryAddClass(String className) { try { Class.forName(className); whitelist.add(className); } catch (ClassNotFoundException e) { + log.warn("Failed to load class {}", className); } } From fd1c4443f0bcb47a4f6be040e33ce94a480e632f Mon Sep 17 00:00:00 2001 From: "alan578.zhao" <956322745@qq.com> Date: Wed, 19 Feb 2025 14:58:07 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4sofa-bo?= =?UTF-8?q?lt=E7=9B=B8=E5=85=B3filter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pd/raft/serializer/HugegraphHessianSerializerFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java index b7fe3884fd..275159a50e 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.Calendar; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet;