From 4e1c8fb6a6ab63c2e2afaa7725074a70ba1fb913 Mon Sep 17 00:00:00 2001 From: liuhy Date: Fri, 28 Feb 2025 13:56:55 +0800 Subject: [PATCH] kafka --- .../compose/shenyu-kafka-compose.yml | 2 +- .../logging/kafka/DividePluginCases.java | 19 +++++++++++-------- .../logging/kafka/DividePluginTest.java | 5 ++++- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml index e592777fbb98..84db1bda1682 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml @@ -60,7 +60,7 @@ services: timeout: 2s retries: 30 ports: - - "8189:8189" + - "31189:8189" networks: - shenyu diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java index 72a5db670882..2919478af51c 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java @@ -17,6 +17,7 @@ package org.apache.shenyu.e2e.testcase.logging.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import io.restassured.http.Method; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -34,7 +35,6 @@ import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import java.time.Duration; import java.time.Instant; @@ -43,6 +43,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists; import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions; import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder; @@ -53,9 +54,8 @@ public class DividePluginCases implements ShenYuScenarioProvider { private static final String TOPIC = "shenyu-access-logging"; private static final String TEST = "/http/order/findById?id=123"; - - @Value("${HOST_IP}") - private static String kafkaBroker; + + private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class); @@ -116,19 +116,22 @@ private ShenYuScenarioSpec testKafkaHello() { KafkaConsumer consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(TOPIC)); Thread.sleep(1000 * 30); - AtomicReference keepCosuming = new AtomicReference<>(true); + AtomicReference keepConsuming = new AtomicReference<>(true); Instant start = Instant.now(); - while (keepCosuming.get()) { + while (keepConsuming.get()) { + LOG.info("keepConsuming.get():{}", keepConsuming.get()); if (Duration.between(start, Instant.now()).toMillis() > 300000) { - keepCosuming.set(false); + keepConsuming.set(false); + LOG.info("timeout1"); } ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + LOG.info("records.count:{}", records.count()); records.forEach(record -> { String message = record.value(); LOG.info("kafka message:{}", message); if (message.contains("/http/order/findById")) { isLog.set(true); - keepCosuming.set(false); + keepConsuming.set(false); } }); } diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java index 64895d4fc716..8b2f51b51b2b 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java @@ -17,6 +17,7 @@ package org.apache.shenyu.e2e.testcase.logging.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.apache.shenyu.e2e.client.WaitDataSync; import org.apache.shenyu.e2e.client.admin.AdminClient; @@ -69,6 +70,8 @@ public class DividePluginTest { private static final Logger LOG = LoggerFactory.getLogger(DividePluginTest.class); private List selectorIds = Lists.newArrayList(); + + private static final ObjectMapper MAPPER = new ObjectMapper(); @BeforeEach void before(final AdminClient client, final GatewayClient gateway, final BeforeEachSpec spec) { @@ -120,7 +123,7 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr "{\"topic\":\"shenyu-access-logging\",\"bootstrapServer\":\"shenyu-kafka:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}"); adminClient.changePluginStatus("1801816010882822171", reqBody); Map plugins = gatewayClient.getPlugins(); - LOG.info("shenyu e2e plugin list ={}", plugins); + LOG.info("shenyu e2e plugin list: {}", MAPPER.writeValueAsString(plugins)); WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin"); }