forked from williampeer/kafka-workshop
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathBarebonesKafkaClients.kt
164 lines (140 loc) · 5.77 KB
/
BarebonesKafkaClients.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package tasks
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
object BarebonesKafkaClients {
private const val BOOTSTRAP_SERVER_URL = "localhost:9094"
private const val SCHEMA_REGISTRY_URL = "http://localhost:8085"
fun sharedProps(): Map<String, String> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER_URL,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "PLAINTEXT",
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to SCHEMA_REGISTRY_URL,
)
}
fun getBareBonesProducer(): KafkaProducer<String, String> {
val configMap = sharedProps() + mapOf(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer"
)
return KafkaProducer<String, String>(configMap)
}
fun getBareBonesConsumer(
offsetConfig: String = "latest",
groupId: String = "my-consumer-${UUID.randomUUID()}",
config: Map<String, String> = emptyMap()
) =
KafkaConsumer<String, String>(
sharedProps() + config +
mapOf(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.GROUP_ID_CONFIG to groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to offsetConfig,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false"
)
)
fun <V> getAvroProducer(): KafkaProducer<String, V> =
KafkaProducer<String, V>(
sharedProps() + mapOf(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer"
)
)
fun <V> getAvroConsumer(offsetConfig: String = "earliest", groupId: String = "random-group-${UUID.randomUUID()}"):
KafkaConsumer<String, V> =
KafkaConsumer<String, V>(
sharedProps() + mapOf(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to offsetConfig,
ConsumerConfig.GROUP_ID_CONFIG to groupId,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to "true",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to "6000"
)
)
}
class ContinuousProducer(private val topicName: String, private val messageProducer: () -> Pair<String, String>) {
private val pauseRendezvous: Channel<Unit> = Channel(0)
private val startRendezvous: Channel<Unit> = Channel(0)
val producer = BarebonesKafkaClients.getBareBonesProducer()
init {
CoroutineScope(Job()).launch(Dispatchers.IO) {
startProducer()
}
}
private suspend fun startProducer() {
while (true) {
startRendezvous.receive()
while (pauseRendezvous.tryReceive().isFailure) {
producer.send(
messageProducer().let {
ProducerRecord(
topicName,
it.first,
it.second
)
}
)
delay(100)
}
}
}
fun resume() {
runBlocking {
startRendezvous.send(Unit)
}
}
fun stop() {
runBlocking { pauseRendezvous.send(Unit) }
}
}
class BasicContinuousConsumer(
groupId: String,
private val topicName: String,
offsetResetConfig: String = "latest",
private val consumeFunction: (ConsumerRecord<String, String>, KafkaConsumer<String, String>) -> Unit
) {
val consumer = BarebonesKafkaClients.getBareBonesConsumer(groupId = groupId, offsetConfig = offsetResetConfig)
private val pauseRendezvous: Channel<Unit> = Channel(0)
private val resumeRendezvous: Channel<Unit> = Channel(0)
init {
CoroutineScope(Job()).launch(Dispatchers.IO) {
startConsumer()
}
}
private suspend fun startConsumer() {
while (true) {
consumer.subscribe(listOf(topicName))
while (pauseRendezvous.tryReceive().isFailure) {
consumer.poll(1000.milliseconds.toJavaDuration()).forEach {
consumeFunction(it, consumer)
}
}
resumeRendezvous.receive()
}
}
fun resume() {
runBlocking {
resumeRendezvous.send(Unit)
}
}
fun stop() {
runBlocking { pauseRendezvous.send(Unit) }
}
fun close() {
stop()
consumer.close()
}
}