Skip to content

Commit

Permalink
kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Aias00 committed Feb 28, 2025
1 parent 40f2993 commit 4e1c8fb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ services:
timeout: 2s
retries: 30
ports:
- "8189:8189"
- "31189:8189"
networks:
- shenyu

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shenyu.e2e.testcase.logging.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.restassured.http.Method;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -34,7 +35,6 @@
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -43,6 +43,7 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
Expand All @@ -53,9 +54,8 @@ public class DividePluginCases implements ShenYuScenarioProvider {
private static final String TOPIC = "shenyu-access-logging";

private static final String TEST = "/http/order/findById?id=123";

@Value("${HOST_IP}")
private static String kafkaBroker;

private static final ObjectMapper MAPPER = new ObjectMapper();

private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class);

Expand Down Expand Up @@ -116,19 +116,22 @@ private ShenYuScenarioSpec testKafkaHello() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(TOPIC));
Thread.sleep(1000 * 30);
AtomicReference<Boolean> keepCosuming = new AtomicReference<>(true);
AtomicReference<Boolean> keepConsuming = new AtomicReference<>(true);
Instant start = Instant.now();
while (keepCosuming.get()) {
while (keepConsuming.get()) {
LOG.info("keepConsuming.get():{}", keepConsuming.get());
if (Duration.between(start, Instant.now()).toMillis() > 300000) {
keepCosuming.set(false);
keepConsuming.set(false);
LOG.info("timeout1");
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
LOG.info("records.count:{}", records.count());
records.forEach(record -> {
String message = record.value();
LOG.info("kafka message:{}", message);
if (message.contains("/http/order/findById")) {
isLog.set(true);
keepCosuming.set(false);
keepConsuming.set(false);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shenyu.e2e.testcase.logging.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import org.apache.shenyu.e2e.client.WaitDataSync;
import org.apache.shenyu.e2e.client.admin.AdminClient;
Expand Down Expand Up @@ -69,6 +70,8 @@ public class DividePluginTest {
private static final Logger LOG = LoggerFactory.getLogger(DividePluginTest.class);

private List<String> selectorIds = Lists.newArrayList();

private static final ObjectMapper MAPPER = new ObjectMapper();

@BeforeEach
void before(final AdminClient client, final GatewayClient gateway, final BeforeEachSpec spec) {
Expand Down Expand Up @@ -120,7 +123,7 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr
"{\"topic\":\"shenyu-access-logging\",\"bootstrapServer\":\"shenyu-kafka:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}");
adminClient.changePluginStatus("1801816010882822171", reqBody);
Map<String, Integer> plugins = gatewayClient.getPlugins();
LOG.info("shenyu e2e plugin list ={}", plugins);
LOG.info("shenyu e2e plugin list: {}", MAPPER.writeValueAsString(plugins));
WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin");
}

Expand Down

0 comments on commit 4e1c8fb

Please sign in to comment.