How Java implements message communication using Apache Kafka
Apache Kafka is a distributed, highly reliable, and high-performance message queue system that can be used to build real-time data streams and data pipelines. It mainly consists of three core components: Producer, Broker, and Consumer. The advantages of Kafka are as follows: 1. High throughput: Kafka can handle high concurrency and large-scale message flows, processing millions of messages per second. 2. Scalability: Kafka's data storage is done in a partitioned manner, which can be horizontally expanded to multiple nodes to support larger data volumes. 3. Persistence: Kafka persists all published messages to disk and supports the configuration of message persistence time. 4. Reliability: Kafka supports message backup and fault recovery, ensuring the reliable transmission of messages. 5. Multiple message publishing modes: Kafka supports multiple publishing modes, including peer-to-peer, publish subscribe, and batch publish. 6. Efficient data compression: Kafka can compress messages and reduce the amount of data transmitted on the network. 7. Distributed: Kafka supports distributed deployment and provides a multi copy backup mechanism to ensure high availability of data. The following is the complete sample code for implementing Kafka message sending and receiving using Java. Firstly, it is necessary to add Kafka's Maven dependency to the pom.xml file: ```xml <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> ``` Then, the message can be sent using the following code: ```java import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { String bootstrapServers = "localhost:9092"; String topic = "test-topic"; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { String key = "key" + i; String value = "value" + i; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); } producer.close(); } } ``` The above code creates a KafkaProducer object, configured with Kafka's address (bootstrap. servers) and serializer. Then, 10 messages were sent through a loop, each containing a key and a value. Finally, the producer was closed. Next, you can use the following code to receive messages: ```java import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { String bootstrapServers = "localhost:9092"; String topic = "test-topic"; String groupId = "test-group"; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", groupId); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: key = " + record.key() + ", value = " + record.value()); } } } } ``` The above code creates a KafkaConsumer object, configuring Kafka's address (bootstrap. servers), deserializer, and consumer group (group. id). Then, the specified topic was subscribed to by calling the 'consumer. subscribe()' method. Next, by consuming the message in a loop, print out the key and value of the message. Configuration example: -Kafka Service Address: localhost: 9092 -Topic name: test topic -Consumer group name: test group Official website link: https://kafka.apache.org/