I. Pour commencer : qu’est-ce que Kafka ?▲
Apache Kafka est une plate-forme de diffusion d'événements distribuée open source utilisée par des milliers d'entreprises pour des pipelines de données à haute performance, des analyses de streaming, l'intégration de données et des applications critiques (site officiel : https://kafka.apache.org/).
Vous pouvez également le découvrir au travers du tutoriel de SOAT (https://soat.developpez.com/tutoriels/bigdata/apprendre-kafka-concepts-fonctionnement/).
II. Environnement▲
II-A. Technique▲
- Système d’exploitation : Windows 10 (il est possible d’utiliser Linux).
-
Docker pour déployer Kafka.
- Dans ce tutoriel, Docker Desktop de Windows a été utilisé (https://docs.docker.com/docker-for-windows/install/).
- A savoir : il est nécessaire d’activer la virtualisation dans le Bios et d’installer Linux pour Windows 10 (https://docs.microsoft.com/fr-fr/win.../install-win10).
- Environnement de développement : Eclipse (https://www.eclipse.org/).
- Langage : Java avec le Framework Spring et Maven.
II-B. Schéma▲
Le programme que nous allons réaliser permettra à un expéditeur d’envoyer un message à un destinataire au travers de la messagerie Kafka. Ce dernier sera démarré avec Docker.
Une partie du programme va envoyer un message au travers de la messagerie Kafka à un destinataire.
Une autre partie du programme écoutera la messagerie Kafka, si un message lui est destiné, il le réceptionnera.
III. Démarrage de l’image Docker Kafka▲
III-A. Installation et lancement▲
- Démarrer Docker.
- Créer un fichier « docker-compose.yml » dans un répertoire.
-
Copier le contenu ci-dessous dans le fichier.
Pour information : Les images utilisées de Docker pour Kafka sont :
https://hub.docker.com/r/wurstmeister/kafka
https://hub.docker.com/_/zookeeper
https://hub.docker.com/r/kafkamanager/kafka-manager
docker-compose.ymlversion
:
'3.5'
services
:
zookeeper
:
image
:
zookeeperports
:
-
"2181:2181"
kafka
:
image
:
wurstmeister/kafkaports
:
-
"9092:9092"
environment
:
# HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT
:
zookeeper
:
2181
KAFKA_ADVERTISED_HOST_NAME
:
kafkaKAFKA_ADVERTISED_PORT
:
9092
KAFKA_BROKER_ID
:
1
KAFKA_ZOOKEEPER_CONNECT
:
zookeeper
:
2181
KAFKA_JMX_OPTS
:
"-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=1099"
JMX_PORT
:
1099
volumes
:
-
/var/run/docker.sock
:
/var/run/docker.sockdepends_on
:
-
zookeeper kafka-
manager
:
image
:
kafkamanager/kafka-
managerports
:
-
"9000:9000"
links
:
-
zookeeper-
kafkaenvironment
:
ZK_HOSTS
:
zookeeper
:
2181
-
En ligne de commande (CMD sous Windows), se positionner dans le répertoire et lancer la ligne de commande : docker-compose upou docker compose up(suivant la version).
-
Les images vont se télécharger et démarrer (la première exécution peut prendre un certain temps).
- Avec un autre invite de commande, il est également possible de vérifier que les conteneurs sont démarrés en tapant docker ps.
C:\TutoKafka>
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9a077b5079d5 kafkamanager/kafka-manager "cmak-3.0.0.4/bin/cm…"
16
seconds ago Up 15
seconds 0
.0
.0
.0
:9000
->
9000
/tcp kafka_kafka-manager_1
ac45367575e5 wurstmeister/kafka "start-kafka.sh"
17
seconds ago Up 16
seconds 0
.0
.0
.0
:9092
->
9092
/tcp kafka_kafka_1
7b7eca7f1642 zookeeper "/docker-entrypoint.…"
17
seconds ago Up 16
seconds 2888
/tcp, 3888
/tcp, 0
.0
.0
.0
:2181
->
2181
/tcp, 8080
/tcp kafka_zookeeper_1
III-B. Administration par kafka-manager (CMAK)▲
- Via un navigateur, lancer http://localhost:9000.
- Ajouter un cluster en renseignant les informations ci-après.
IV. Modélisation▲
IV-A. Diagramme de classe▲
Le programme est structuré en package :
- dto (Data Transfer Object) : objet métier qui sera transmis comme message.
- sender : partie du programme pour envoyer le message.
- receiver : partie du programme pour écouter la réception du message et exécuter un traitement.
IV-B. Diagramme de séquence▲
Dans ce diagramme de séquence, les deux parties du programme sont visibles :
- Au démarrage de l’application, la partie destinataire écoute ou attend la réception d’un message. Si un message est réceptionné, un traitement est exécuté en affichant un message contenant l’objet « Person » dans la console.
- Au démarrage du programme principal, une boucle infinie est lancée pour instancier un objet « Person » et l’envoyer en tant que message.
V. Initialisation du projet▲
- Aller sur le site https://start.spring.io/.
-
Renseigner les champs et ajouter dans les « Dependencies » (Spring for Apache Kafka) et enfin « Generate ».
-
Télécharger le fichier « zip » et l’extraire dans un répertoire.
-
Lancer Eclipse.
-
Importer le projet.
-
Ouvrir l’arborescence du projet.
-
Ouvrir et ajouter dans le fichier « pom.xml ».
Extraction pom.xmlCacher/Afficher le codeSélectionnez<dependency>
<groupId>
com.fasterxml.jackson.core</groupId>
<artifactId>
jackson-databind</artifactId>
</dependency>
- Si vous lancez le programme en local, il faudra ajouter dans le fichier Host « 127.0.0.1 kafka » (C:\Windows\System32\drivers\etc\hosts).
# Copyright (c) 1993-2009 Microsoft Corp.
#
# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.
#
# This file contains the mappings of IP addresses to host names. Each
# entry should be kept on an individual line. The IP address should
# be placed in the first column followed by the corresponding host name.
# The IP address and the host name should be separated by at least one
# space.
#
# Additionally, comments (such as these) may be inserted on individual
# lines or following the machine name denoted by a '#' symbol.
#
# For example:
#
# 102.54.94.97 rhino.acme.com # source server
# 38.25.63.10 x.acme.com # x client host
# localhost name resolution is handled within DNS itself.
# 127.0.0.1 localhost
# ::1 localhost
# Added by Docker Desktop
192.168.3.138 host.docker.internal
192.168.3.138 gateway.docker.internal
# To allow the same kube context to work on the host and the container:
127.0.0.1 kubernetes.docker.internal
127.0.0.1 kafka
# End of section
VI. Programme▲
VI-A. Structure du programme▲
Voici la structure finale du programme.
VI-B. Data To Object (dto)▲
Person.java
Utilisation d'un objet ex : "Person" qui sera transmis entre l’expéditeur et le destinataire.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
package
fr.tutoriel.kafka.dto;
/**
* Exemple d'un objet
*/
public
class
Person {
private
String firstName;
private
String lastName;
private
int
age;
public
Person
(
) {
}
public
Person
(
String firstName, String lastName, int
age) {
this
.firstName =
firstName;
this
.lastName =
lastName;
this
.age =
age;
}
public
String getFirstName
(
) {
return
firstName;
}
public
void
setFirstName
(
String msg) {
this
.firstName =
msg;
}
public
String getLastName
(
) {
return
lastName;
}
public
void
setLastName
(
String name) {
this
.lastName =
name;
}
public
int
getAge
(
) {
return
age;
}
public
void
setAge
(
int
age) {
this
.age =
age;
}
@Override
public
String toString
(
) {
return
"{firstName:
\"
"
+
firstName+
"
\"
, lastName:
\"
"
+
lastName+
"
\"
, age:"
+
String.valueOf
(
age)+
"}"
;
}
}
VI-C. Receiver▲
Développement de la partie Destinataire pour écouter et réceptionner les messages.
IProcess.java
Interface pour l'exécution d'un traitement.
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
package
fr.tutoriel.kafka.receiver;
import
fr.tutoriel.kafka.dto.Person;
/**
* Interface pour gerer les traitements
*
* */
public
interface
IProcess {
/**
* Exécuter un traitement
*
@param
info
*
@param
person
*/
public
void
execute
(
String info, Person person);
}
IReceiver.java
Interface pour écouter la réception d'un message.
Il serait possible d'utiliser la même interface pour écouter un message avec une autre messagerie que Kafka (ex : RabbitMQ).
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
package
fr.tutoriel.kafka.receiver;
import
fr.tutoriel.kafka.dto.Person;
/**
* Interface pour écouter la réception d'un message
*
* Il serait possible d'utiliser la même interface pour écouter
* un message avec une autre messagerie que Kafka (ex : RabbitMQ)
* */
public
interface
IReceiver {
public
void
listen
(
String topicName, Person person);
}
KafkaReceiverConfig.java
Configuration du destinataire avec l'adresse du serveur, le groupe, type de message en JSON.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
package
fr.tutoriel.kafka.receiver;
import
java.util.HashMap;
import
java.util.Map;
import
org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.common.serialization.StringDeserializer;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
org.springframework.kafka.annotation.EnableKafka;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import
org.springframework.kafka.core.ConsumerFactory;
import
org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import
org.springframework.kafka.support.serializer.JsonDeserializer;
import
fr.tutoriel.kafka.dto.Person;
/**
* Configuration du destinataire
*/
@EnableKafka
@Configuration
public
class
KafkaReceiverConfig {
@Value
(
value =
"${kafka.bootstrapAddress:kafka:9092}"
) // Adresse du serveur Kafka pour envoyer les messages (il est possible de le configurer via application.properties)
private
String bootstrapAddress;
private
static
final
String groupId =
"Tutorial"
; // Définition du groupe ex: Tutorial
@Bean
public
ConsumerFactory<
String, Person>
receiverFactory
(
) {
Map<
String, Object>
props =
new
HashMap<>(
);
props.put
(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Configuration de l'adresse du serveur
props.put
(
ConsumerConfig.GROUP_ID_CONFIG,groupId);// Configuration du groupe
return
new
DefaultKafkaConsumerFactory<>(
props,new
StringDeserializer
(
),new
JsonDeserializer<>(
Person.class
));
}
@Bean
public
ConcurrentKafkaListenerContainerFactory<
String, Person>
kafkaListenerContainerFactory
(
) {
ConcurrentKafkaListenerContainerFactory<
String, Person>
factory =
new
ConcurrentKafkaListenerContainerFactory<>(
);
factory.setConsumerFactory
(
receiverFactory
(
));
return
factory;
}
}
KafkaReceiver.java
Permet d'écouter la réception d'un message et d'exécuter un traitement (au travers de l'interface).
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
package
fr.tutoriel.kafka.receiver;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.kafka.annotation.KafkaListener;
import
org.springframework.stereotype.Service;
import
fr.tutoriel.kafka.dto.Person;
/**
* Receveur Kafka
*
*/
@Service
public
class
KafkaReceiver implements
IReceiver {
@Autowired
private
IProcess process; // Affectation de l'instance pour la gestion des traitements
/**
* Ecoute sur le topic "Tuto1", s'il existe un message
*
*
@param
person
* Objet reçu
*/
@KafkaListener
(
topics =
"Tuto1"
,containerFactory=
"kafkaListenerContainerFactory"
)
public
void
listenTuto1
(
Person person) {
listen
(
"Tuto1"
,person);
}
/**
* Ecoute sur le topic "Tuto2", s'il existe un message
*
@param
person
* Objet reçu
*/
@KafkaListener
(
topics =
"Tuto2"
,containerFactory=
"kafkaListenerContainerFactory"
)
public
void
listenTuto2
(
Person person) {
listen
(
"Tuto2"
,person);
}
/**
*
* Exécution d'un traitement
*
@param
topicName
* Nom du topic
*
@param
topicName
* Objet reçu
*
*/
@Override
public
void
listen
(
String topicName, Person person) {
process.execute
(
topicName+
"Info"
, person);
}
}
ProcessExample.java
Permet d'exécuter un traitement.
Dans l'exemple, affichage d'un texte dans la console.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
package
fr.tutoriel.kafka.receiver;
import
org.springframework.stereotype.Service;
import
fr.tutoriel.kafka.dto.Person;
/**
* Traitement exemple
*/
@Service
public
class
ProcessExample implements
IProcess {
/**
* Exécuter le traitement : Dans notre exemple affichage du message dans la console
*
@param
topicName
* Nom du Topic
*
@param
person
* objet person
*/
@Override
public
void
execute
(
String info, Person person) {
System.out.println
(
"Message reçu: info="
+
info +
", person="
+
person);
}
}
VI-D. Sender▲
Développement de la partie Expéditeur pour envoyer les messages.
ISender.java
Interface pour envoyer un message.
Il serait possible d'utiliser la même interface pour envoyer un message avec une autre messagerie que Kafka (ex : RabbitMQ).
(NB : facultatif : pour anticiper la séparation du programme)
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
package
fr.tutoriel.kafka.sender;
import
fr.tutoriel.kafka.dto.Person;
/**
* Interface pour envoyer un message
*
* Il serait possible d'utiliser la même interface pour envoyer
* un message avec une autre messagerie que Kafka (ex : RabbitMQ)
* */
public
interface
ISender {
void
send
(
String topicName, Person person);
}
KafkaSenderConfig.java
Configuration de l'expéditeur avec l'adresse du serveur, type de message en JSON.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
package
fr.tutoriel.kafka.sender;
import
java.util.HashMap;
import
java.util.Map;
import
org.apache.kafka.clients.producer.ProducerConfig;
import
org.apache.kafka.common.serialization.StringSerializer;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
org.springframework.kafka.core.DefaultKafkaProducerFactory;
import
org.springframework.kafka.core.KafkaTemplate;
import
org.springframework.kafka.core.ProducerFactory;
import
org.springframework.kafka.support.serializer.JsonSerializer;
import
fr.tutoriel.kafka.dto.Person;
/**
* Configuration de l'expéditeur
*/
@Configuration
public
class
KafkaSenderConfig {
@Value
(
value =
"${kafka.bootstrapAddress:kafka:9092}"
) // Adresse du serveur Kafka (il est possible de le configurer via application.properties)
private
String bootstrapAddress;
@Bean
public
ProducerFactory<
String, Person>
senderFactory
(
) {
Map<
String, Object>
configProps =
new
HashMap<>(
);
configProps.put
(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Adresse du serveur
configProps.put
(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class
);
configProps.put
(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class
);
return
new
DefaultKafkaProducerFactory<>(
configProps);
}
@Bean
public
KafkaTemplate<
String, Person>
kafkaTemplate
(
) {
return
new
KafkaTemplate<>(
senderFactory
(
));
}
}
KafkaTopicConfig.java
Configuration Topic avec l'adresse du serveur.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
package
fr.tutoriel.kafka.sender;
import
java.util.HashMap;
import
java.util.Map;
import
org.apache.kafka.clients.admin.AdminClientConfig;
//import org.apache.kafka.clients.admin.NewTopic;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
org.springframework.kafka.core.KafkaAdmin;
/**
* Configuration Topic
*/
@Configuration
public
class
KafkaTopicConfig {
@Value
(
value =
"${kafka.bootstrapAddress:kafka:9092}"
) // Adresse du serveur Kafka (il est possible de le configurer via application.properties)
private
String bootstrapAddress;
@Bean
public
KafkaAdmin kafkaAdmin
(
) {
Map<
String, Object>
configs =
new
HashMap<>(
);
configs.put
(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return
new
KafkaAdmin
(
configs);
}
}
KafkaSender.java
Permet d'envoyer un message (Person) dans un topic.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
package
fr.tutoriel.kafka.sender;
import
org.apache.kafka.clients.admin.NewTopic;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.kafka.core.KafkaTemplate;
import
org.springframework.stereotype.Service;
import
fr.tutoriel.kafka.dto.Person;
/**
*
* Expediteur Kafka
*/
@Service
public
class
KafkaSender implements
ISender {
@Autowired
private
KafkaTemplate<
String, Person>
kafkaTemplate;
/**
* Permet d'envoyer un message (Person) dans un topic
*
*
@param
topicName
* Nom du topic
*
@param
person
* Objet à envoyer
*/
@Override
public
void
send
(
String topicName, Person person) {
System.out.println
(
"Message à envoyer : topicName="
+
topicName +
", person="
+
person);
new
NewTopic
(
topicName, 1
, (
short
) 1
);
kafkaTemplate.send
(
topicName, person);
}
}
VI-E. Main▲
Programme principal
KafkaApplication.java
Spring boot permet de démarrer et configurer les parties "réception" et "destinataire".
La méthode "run" permet d'envoyer en boucle des messages vers une messagerie (Kafka ou autre).
Au travers de la méthode "send", un texte est affiché dans la console.
En asynchrone, à la réception du message, celui-ci est affiché dans la console.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
package
fr.tutoriel.kafka;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.boot.CommandLineRunner;
import
org.springframework.boot.SpringApplication;
import
org.springframework.boot.autoconfigure.SpringBootApplication;
import
fr.tutoriel.kafka.dto.Person;
import
fr.tutoriel.kafka.sender.ISender;
/**
* KafkaApplication
* Programme principal
*/
@SpringBootApplication
public
class
KafkaApplication implements
CommandLineRunner {
@Autowired
private
ISender sender; // Affectation de l'instance sender
public
static
void
main
(
String[] args) {
SpringApplication.run
(
KafkaApplication.class
, args);
}
/**
* Exécuter le programme
* Permet d'envoyer des messages vers une messagerie (Kafka ou autre)
*
* La réception du message est asynchrone (en attente)
*/
@Override
public
void
run
(
String... args) throws
Exception {
int
ageTuto1 =
10
; //
int
ageTuto2 =
20
; //
while
(
true
) {
// Boucle infinie
Person personTuto1 =
new
Person
(
"Jean"
, "DUPOND"
, ageTuto1); // Créer un objet pour Tuto1
sender.send
(
"Tuto1"
, personTuto1); // Envoyer l'objet sur le topic "Tuto1"
Thread.sleep
(
3000
); // Attendre 3s
Person personTuto2 =
new
Person
(
"Pierre"
,"DURAND"
,ageTuto2); // Créer un objet pour Tuto2
sender.send
(
"Tuto2"
, personTuto2); // Envoyer l'objet sur le topic "Tuto2"
Thread.sleep
(
3000
); // Attendre 3s
ageTuto1++
; // Incrémentation du texte
ageTuto2++
; // Incrémentation du texte
}
}
}
VII. Exécution▲
VII-A. Exécuter▲
- Sélectionner le fichier KafkaApplication.java
- Faire Clic droit > Run As > Java Application
VII-B. Résultat▲
Après exécution, il est possible de visualiser dans la console les messages (envoi et réception).
2.
3.
4.
5.
6.
Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:20}
Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:20}
Message à envoyer : topicName=Tuto1, person={firstName:"Jean", lastName:"DUPOND", age:11}
Message reçu: info=Tuto1Info, person={firstName:"Jean", lastName:"DUPOND", age:11}
Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:21}
Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:21}
VIII. Conclusion▲
J’espère qu’à travers ce tutoriel vous réussirez à installer Kafka et exécuter votre premier programme en Java.
Pour aller plus loin :
-
Il serait judicieux de séparer le programme au moins en 3 parties :
- Une bibliothèque partagée (pour mettre la classe « Person » et les interfaces)
- Une partie « Sender »
- Une partie « Receiver »
- Il est possible en gardant cette structure de faire fonctionner le programme avec une autre messagerie comme RabbitMQ (https://www.rabbitmq.com/)
IX. Liens▲
- Kafka : https://kafka.apache.org/
- Docker: https://www.docker.com/
- Docker Desktop : https://docs.docker.com/docker-for-windows/install/
- Code source inspiré du site : https://www.baeldung.com/spring-kafka
X. Remerciements▲
Je tiens à remercier Mickael Baron pour son accompagnement à la réalisation de ce tutoriel et ClaudeLELOUP pour sa relecture orthographique.