I. Pour commencer : qu’est-ce que Kafka ?▲
Apache Kafka est une plate-forme de diffusion d'événements distribuée open source utilisée par des milliers d'entreprises pour des pipelines de données à haute performance, des analyses de streaming, l'intégration de données et des applications critiques (site officiel : https://kafka.apache.org/).
Vous pouvez également le découvrir au travers du tutoriel de SOAT (https://soat.developpez.com/tutoriels/bigdata/apprendre-kafka-concepts-fonctionnement/).
II. Environnement▲
II-A. Technique▲
- Système d’exploitation : Windows 10 (il est possible d’utiliser Linux).
-
Docker pour déployer Kafka.
- Dans ce tutoriel, Docker Desktop de Windows a été utilisé (https://docs.docker.com/docker-for-windows/install/).
- A savoir : il est nécessaire d’activer la virtualisation dans le Bios et d’installer Linux pour Windows 10 (https://docs.microsoft.com/fr-fr/win.../install-win10).
- Environnement de développement : Eclipse (https://www.eclipse.org/).
- Langage : Java avec le Framework Spring et Maven.
II-B. Schéma▲
Le programme que nous allons réaliser permettra à un expéditeur d’envoyer un message à un destinataire au travers de la messagerie Kafka. Ce dernier sera démarré avec Docker.
Une partie du programme va envoyer un message au travers de la messagerie Kafka à un destinataire.
Une autre partie du programme écoutera la messagerie Kafka, si un message lui est destiné, il le réceptionnera.

III. Démarrage de l’image Docker Kafka▲
III-A. Installation et lancement▲
- Démarrer Docker.
- Créer un fichier « docker-compose.yml » dans un répertoire.
-
Copier le contenu ci-dessous dans le fichier.
Pour information : Les images utilisées de Docker pour Kafka sont :
https://hub.docker.com/r/wurstmeister/kafka
https://hub.docker.com/_/zookeeper
https://hub.docker.com/r/kafkamanager/kafka-manager
docker-compose.ymlversion:'3.5'services:zookeeper:image:zookeeperports:-"2181:2181"kafka:image:wurstmeister/kafkaports:-"9092:9092"environment:# HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181KAFKA_ADVERTISED_HOST_NAME:kafkaKAFKA_ADVERTISED_PORT:9092KAFKA_BROKER_ID:1KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181KAFKA_JMX_OPTS:"-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=1099"JMX_PORT:1099volumes:-/var/run/docker.sock:/var/run/docker.sockdepends_on:-zookeeper kafka-manager:image:kafkamanager/kafka-managerports:-"9000:9000"links:-zookeeper-kafkaenvironment:ZK_HOSTS:zookeeper:2181 -
En ligne de commande (CMD sous Windows), se positionner dans le répertoire et lancer la ligne de commande : docker-compose upou docker compose up(suivant la version).
-
Les images vont se télécharger et démarrer (la première exécution peut prendre un certain temps).
- Avec un autre invite de commande, il est également possible de vérifier que les conteneurs sont démarrés en tapant docker ps.
C:\TutoKafka>docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9a077b5079d5 kafkamanager/kafka-manager "cmak-3.0.0.4/bin/cm…" 16 seconds ago Up 15 seconds 0.0.0.0:9000->9000/tcp kafka_kafka-manager_1
ac45367575e5 wurstmeister/kafka "start-kafka.sh" 17 seconds ago Up 16 seconds 0.0.0.0:9092->9092/tcp kafka_kafka_1
7b7eca7f1642 zookeeper "/docker-entrypoint.…" 17 seconds ago Up 16 seconds 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp kafka_zookeeper_1III-B. Administration par kafka-manager (CMAK)▲
- Via un navigateur, lancer http://localhost:9000.
- Ajouter un cluster en renseignant les informations ci-après.
IV. Modélisation▲
IV-A. Diagramme de classe▲
Le programme est structuré en package :
- dto (Data Transfer Object) : objet métier qui sera transmis comme message.
- sender : partie du programme pour envoyer le message.
- receiver : partie du programme pour écouter la réception du message et exécuter un traitement.
IV-B. Diagramme de séquence▲
Dans ce diagramme de séquence, les deux parties du programme sont visibles :
- Au démarrage de l’application, la partie destinataire écoute ou attend la réception d’un message. Si un message est réceptionné, un traitement est exécuté en affichant un message contenant l’objet « Person » dans la console.
- Au démarrage du programme principal, une boucle infinie est lancée pour instancier un objet « Person » et l’envoyer en tant que message.
V. Initialisation du projet▲
- Aller sur le site https://start.spring.io/.
-
Renseigner les champs et ajouter dans les « Dependencies » (Spring for Apache Kafka) et enfin « Generate ».
-
Télécharger le fichier « zip » et l’extraire dans un répertoire.

Dossier projet -
Lancer Eclipse.
-
Importer le projet.
-
Ouvrir l’arborescence du projet.
-
Ouvrir et ajouter dans le fichier « pom.xml ».
Extraction pom.xmlCacher/Afficher le codeSélectionnez<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency> - Si vous lancez le programme en local, il faudra ajouter dans le fichier Host « 127.0.0.1 kafka » (C:\Windows\System32\drivers\etc\hosts).
# Copyright (c) 1993-2009 Microsoft Corp.
#
# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.
#
# This file contains the mappings of IP addresses to host names. Each
# entry should be kept on an individual line. The IP address should
# be placed in the first column followed by the corresponding host name.
# The IP address and the host name should be separated by at least one
# space.
#
# Additionally, comments (such as these) may be inserted on individual
# lines or following the machine name denoted by a '#' symbol.
#
# For example:
#
# 102.54.94.97 rhino.acme.com # source server
# 38.25.63.10 x.acme.com # x client host
# localhost name resolution is handled within DNS itself.
# 127.0.0.1 localhost
# ::1 localhost
# Added by Docker Desktop
192.168.3.138 host.docker.internal
192.168.3.138 gateway.docker.internal
# To allow the same kube context to work on the host and the container:
127.0.0.1 kubernetes.docker.internal
127.0.0.1 kafka
# End of sectionVI. Programme▲
VI-A. Structure du programme▲
Voici la structure finale du programme.

VI-B. Data To Object (dto)▲
Person.java
Utilisation d'un objet ex : "Person" qui sera transmis entre l’expéditeur et le destinataire.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
package fr.tutoriel.kafka.dto;
/**
* Exemple d'un objet
*/
public class Person {
private String firstName;
private String lastName;
private int age;
public Person() {
}
public Person(String firstName, String lastName, int age) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String msg) {
this.firstName = msg;
}
public String getLastName() {
return lastName;
}
public void setLastName(String name) {
this.lastName = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "{firstName:\""+firstName+"\", lastName:\""+lastName+"\", age:"+String.valueOf(age)+"}";
}
}
VI-C. Receiver▲
Développement de la partie Destinataire pour écouter et réceptionner les messages.
IProcess.java
Interface pour l'exécution d'un traitement.
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
package fr.tutoriel.kafka.receiver;
import fr.tutoriel.kafka.dto.Person;
/**
* Interface pour gerer les traitements
*
* */
public interface IProcess {
/**
* Exécuter un traitement
* @param info
* @param person
*/
public void execute(String info, Person person);
}
IReceiver.java
Interface pour écouter la réception d'un message.
Il serait possible d'utiliser la même interface pour écouter un message avec une autre messagerie que Kafka (ex : RabbitMQ).
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
package fr.tutoriel.kafka.receiver;
import fr.tutoriel.kafka.dto.Person;
/**
* Interface pour écouter la réception d'un message
*
* Il serait possible d'utiliser la même interface pour écouter
* un message avec une autre messagerie que Kafka (ex : RabbitMQ)
* */
public interface IReceiver {
public void listen(String topicName, Person person);
}
KafkaReceiverConfig.java
Configuration du destinataire avec l'adresse du serveur, le groupe, type de message en JSON.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
package fr.tutoriel.kafka.receiver;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import fr.tutoriel.kafka.dto.Person;
/**
* Configuration du destinataire
*/
@EnableKafka
@Configuration
public class KafkaReceiverConfig {
@Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka pour envoyer les messages (il est possible de le configurer via application.properties)
private String bootstrapAddress;
private static final String groupId = "Tutorial"; // Définition du groupe ex: Tutorial
@Bean
public ConsumerFactory<String, Person> receiverFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Configuration de l'adresse du serveur
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);// Configuration du groupe
return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(),new JsonDeserializer<>(Person.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Person> factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(receiverFactory());
return factory;
}
}
KafkaReceiver.java
Permet d'écouter la réception d'un message et d'exécuter un traitement (au travers de l'interface).
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
package fr.tutoriel.kafka.receiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import fr.tutoriel.kafka.dto.Person;
/**
* Receveur Kafka
*
*/
@Service
public class KafkaReceiver implements IReceiver {
@Autowired
private IProcess process; // Affectation de l'instance pour la gestion des traitements
/**
* Ecoute sur le topic "Tuto1", s'il existe un message
*
* @param person
* Objet reçu
*/
@KafkaListener(topics = "Tuto1",containerFactory="kafkaListenerContainerFactory")
public void listenTuto1(Person person) {
listen("Tuto1",person);
}
/**
* Ecoute sur le topic "Tuto2", s'il existe un message
* @param person
* Objet reçu
*/
@KafkaListener(topics = "Tuto2",containerFactory="kafkaListenerContainerFactory")
public void listenTuto2(Person person) {
listen("Tuto2",person);
}
/**
*
* Exécution d'un traitement
* @param topicName
* Nom du topic
* @param topicName
* Objet reçu
*
*/
@Override
public void listen(String topicName, Person person) {
process.execute(topicName+"Info", person);
}
}
ProcessExample.java
Permet d'exécuter un traitement.
Dans l'exemple, affichage d'un texte dans la console.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
package fr.tutoriel.kafka.receiver;
import org.springframework.stereotype.Service;
import fr.tutoriel.kafka.dto.Person;
/**
* Traitement exemple
*/
@Service
public class ProcessExample implements IProcess {
/**
* Exécuter le traitement : Dans notre exemple affichage du message dans la console
* @param topicName
* Nom du Topic
* @param person
* objet person
*/
@Override
public void execute(String info, Person person) {
System.out.println("Message reçu: info="+info +", person=" + person);
}
}
VI-D. Sender▲
Développement de la partie Expéditeur pour envoyer les messages.
ISender.java
Interface pour envoyer un message.
Il serait possible d'utiliser la même interface pour envoyer un message avec une autre messagerie que Kafka (ex : RabbitMQ).
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
package fr.tutoriel.kafka.sender;
import fr.tutoriel.kafka.dto.Person;
/**
* Interface pour envoyer un message
*
* Il serait possible d'utiliser la même interface pour envoyer
* un message avec une autre messagerie que Kafka (ex : RabbitMQ)
* */
public interface ISender {
void send(String topicName, Person person);
}
KafkaSenderConfig.java
Configuration de l'expéditeur avec l'adresse du serveur, type de message en JSON.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
package fr.tutoriel.kafka.sender;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import fr.tutoriel.kafka.dto.Person;
/**
* Configuration de l'expéditeur
*/
@Configuration
public class KafkaSenderConfig {
@Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka (il est possible de le configurer via application.properties)
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Person> senderFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Adresse du serveur
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Person> kafkaTemplate() {
return new KafkaTemplate<>(senderFactory());
}
}
KafkaTopicConfig.java
Configuration Topic avec l'adresse du serveur.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
package fr.tutoriel.kafka.sender;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
//import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
/**
* Configuration Topic
*/
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka (il est possible de le configurer via application.properties)
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
KafkaSender.java
Permet d'envoyer un message (Person) dans un topic.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
package fr.tutoriel.kafka.sender;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import fr.tutoriel.kafka.dto.Person;
/**
*
* Expediteur Kafka
*/
@Service
public class KafkaSender implements ISender {
@Autowired
private KafkaTemplate<String, Person> kafkaTemplate;
/**
* Permet d'envoyer un message (Person) dans un topic
*
* @param topicName
* Nom du topic
* @param person
* Objet à envoyer
*/
@Override
public void send(String topicName, Person person) {
System.out.println("Message à envoyer : topicName="+topicName +", person=" + person);
new NewTopic(topicName, 1, (short) 1);
kafkaTemplate.send(topicName, person);
}
}
VI-E. Main▲
Programme principal
KafkaApplication.java
Spring boot permet de démarrer et configurer les parties "réception" et "destinataire".
La méthode "run" permet d'envoyer en boucle des messages vers une messagerie (Kafka ou autre).
Au travers de la méthode "send", un texte est affiché dans la console.
En asynchrone, à la réception du message, celui-ci est affiché dans la console.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
package fr.tutoriel.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import fr.tutoriel.kafka.dto.Person;
import fr.tutoriel.kafka.sender.ISender;
/**
* KafkaApplication
* Programme principal
*/
@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {
@Autowired
private ISender sender; // Affectation de l'instance sender
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
/**
* Exécuter le programme
* Permet d'envoyer des messages vers une messagerie (Kafka ou autre)
*
* La réception du message est asynchrone (en attente)
*/
@Override
public void run(String... args) throws Exception {
int ageTuto1 = 10; //
int ageTuto2 = 20; //
while (true) { // Boucle infinie
Person personTuto1 = new Person("Jean", "DUPOND", ageTuto1); // Créer un objet pour Tuto1
sender.send("Tuto1", personTuto1); // Envoyer l'objet sur le topic "Tuto1"
Thread.sleep(3000); // Attendre 3s
Person personTuto2 = new Person("Pierre","DURAND",ageTuto2); // Créer un objet pour Tuto2
sender.send("Tuto2", personTuto2); // Envoyer l'objet sur le topic "Tuto2"
Thread.sleep(3000); // Attendre 3s
ageTuto1++; // Incrémentation du texte
ageTuto2++; // Incrémentation du texte
}
}
}
VII. Exécution▲
VII-A. Exécuter▲
- Sélectionner le fichier KafkaApplication.java
- Faire Clic droit > Run As > Java Application
VII-B. Résultat▲
Après exécution, il est possible de visualiser dans la console les messages (envoi et réception).
2.
3.
4.
5.
6.
Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:20}
Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:20}
Message à envoyer : topicName=Tuto1, person={firstName:"Jean", lastName:"DUPOND", age:11}
Message reçu: info=Tuto1Info, person={firstName:"Jean", lastName:"DUPOND", age:11}
Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:21}
Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:21}
VIII. Conclusion▲
J’espère qu’à travers ce tutoriel vous réussirez à installer Kafka et exécuter votre premier programme en Java.
Pour aller plus loin :
-
Il serait judicieux de séparer le programme au moins en 3 parties :
- Une bibliothèque partagée (pour mettre la classe « Person » et les interfaces)
- Une partie « Sender »
- Une partie « Receiver »
- Il est possible en gardant cette structure de faire fonctionner le programme avec une autre messagerie comme RabbitMQ (https://www.rabbitmq.com/)
IX. Liens▲
- Kafka : https://kafka.apache.org/
- Docker: https://www.docker.com/
- Docker Desktop : https://docs.docker.com/docker-for-windows/install/
- Code source inspiré du site : https://www.baeldung.com/spring-kafka
X. Remerciements▲
Je tiens à remercier Mickael Baron pour son accompagnement à la réalisation de ce tutoriel et ClaudeLELOUP pour sa relecture orthographique.














