From 96716cd3e511e2b1c461997c1dd4930b402269b2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 21 Feb 2025 19:06:37 +0000 Subject: [PATCH 1/5] add AtLease --- .../java/org/apache/kafka/raft/QuorumConfig.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 1a7fff83ee6b0..6120febdb35a2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; @@ -102,12 +103,12 @@ public class QuorumConfig { public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC) .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC) - .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC) - .define(QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QUORUM_FETCH_TIMEOUT_MS_DOC) - .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, null, HIGH, QUORUM_ELECTION_BACKOFF_MAX_MS_DOC) - .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QUORUM_LINGER_MS_DOC) - .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) - .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); + .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, atLeast(1), HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC) + .define(QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_FETCH_TIMEOUT_MS, atLeast(1), HIGH, QUORUM_FETCH_TIMEOUT_MS_DOC) + .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(1), HIGH, QUORUM_ELECTION_BACKOFF_MAX_MS_DOC) + .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, atLeast(1), MEDIUM, QUORUM_LINGER_MS_DOC) + .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(1), MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) + .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(1), LOW, QUORUM_RETRY_BACKOFF_MS_DOC); private final List voters; private final List bootstrapServers; From c2f5870702383ddf4638fc243788028705ac3f2a Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 21 Feb 2025 19:09:32 +0000 Subject: [PATCH 2/5] fix build --- raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 6120febdb35a2..b24345f61dabf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -32,10 +32,10 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LIST; From d5807f69dbcd49742624034935ab2fb513507916 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 21 Feb 2025 21:43:48 +0000 Subject: [PATCH 3/5] add test --- .../apache/kafka/raft/QuorumConfigTest.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java new file mode 100644 index 0000000000000..aed8462a69db9 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class QuorumConfigTest { + @Test + public void testPositiveConfig() { + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, "0")); + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "0")); + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, "0")); + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "0")); + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "0")); + verifyPositionConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "0")); + } + + private void verifyPositionConfig(Map overrideConfig) { + Map props = new HashMap<>(); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092"); + props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10"); + + props.putAll(overrideConfig); + + assertThrows(ConfigException.class, () -> new QuorumConfig(new QuorumTestConfig(props))); + } + + private static class QuorumTestConfig extends AbstractConfig { + private final QuorumConfig quorumConfig; + + public QuorumTestConfig(Map originals) { + super(QuorumConfig.CONFIG_DEF, originals, true); + quorumConfig = new QuorumConfig(this); + } + + public QuorumConfig quorumConfig() { + return quorumConfig; + } + } +} From 45a21a1d69635f24e6785163f2d0351830bc738c Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 21 Feb 2025 21:45:36 +0000 Subject: [PATCH 4/5] rename function --- .../org/apache/kafka/raft/QuorumConfigTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java index aed8462a69db9..a04315041064f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java @@ -27,15 +27,15 @@ public class QuorumConfigTest { @Test public void testPositiveConfig() { - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, "0")); - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "0")); - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, "0")); - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "0")); - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "0")); - verifyPositionConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "0")); + verifyPositiveConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "0")); } - private void verifyPositionConfig(Map overrideConfig) { + private void verifyPositiveConfig(Map overrideConfig) { Map props = new HashMap<>(); props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092"); props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); From 9abbcc1eca864b749aadfd70a28ed3cc6dc18c70 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 21 Feb 2025 21:48:14 +0000 Subject: [PATCH 5/5] improve doc --- .../main/java/org/apache/kafka/raft/QuorumConfig.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index b24345f61dabf..482f0a074e121 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -70,23 +70,24 @@ public class QuorumConfig { public static final String QUORUM_ELECTION_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "election.timeout.ms"; public static final String QUORUM_ELECTION_TIMEOUT_MS_DOC = "Maximum time in milliseconds to wait " + - "without being able to fetch from the leader before triggering a new election"; + "without being able to fetch from the leader before triggering a new election, this value should large 0."; public static final int DEFAULT_QUORUM_ELECTION_TIMEOUT_MS = 1_000; public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "fetch.timeout.ms"; public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " + "the current leader before becoming a candidate and triggering an election for voters; Maximum time " + - "a leader can go without receiving valid fetch or fetchSnapshot request from a majority of the quorum before resigning."; + "a leader can go without receiving valid fetch or fetchSnapshot request from a majority of the quorum before resigning, " + + "this value should large 0."; public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000; public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG = QUORUM_PREFIX + "election.backoff.max.ms"; public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_DOC = "Maximum time in milliseconds before starting new elections. " + - "This is used in the binary exponential backoff mechanism that helps prevent gridlocked elections"; + "This is used in the binary exponential backoff mechanism that helps prevent gridlocked elections, this value should large 0."; public static final int DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS = 1_000; public static final String QUORUM_LINGER_MS_CONFIG = QUORUM_PREFIX + "append.linger.ms"; public static final String QUORUM_LINGER_MS_DOC = "The duration in milliseconds that the leader will " + - "wait for writes to accumulate before flushing them to disk."; + "wait for writes to accumulate before flushing them to disk, this value should large 0."; public static final int DEFAULT_QUORUM_LINGER_MS = 25;