Tìm hiểu Apache Kafka cùng Spring boot Phần 1

Mở đầu

Loạt bài này mình muốn giới thiệu về Apache Kafka từ cơ bản tới chi tiết ứng dụng trong thực tiễn. Hiện nay việc sử dụng Kafka để giao tiếp giữa các hệ thống tương đối phổ biến, bản thân mình là 1 Junior vì thế mình hiểu rằng đa phần các bạn mới vào nghề hoặc đã đi làm một vài năm còn khá mơ hồ về khái niệm Kafka cũng như đang sử dụng trên một bộ khung có sẵn do các anh senior hoặc devops để lại. Vì thế mình viết loạt bài này mục đích để cho các bạn mới tìm hiểu về kafka có một cái nhìn tổng quan, cũng như có thể tự dựng một hệ thống đơn giản biết đâu sau này bạn và tôi có thể tự tin dựng một hệ thống cho một dự án mà cấp trên ở công ty giao phó, hoặc đơn giản để xử lý những issue trong quá trình developer của mình trong dự án một cách nhanh gọn và hiệu quả. Bản thân mình cũng là người vừa học vừa viết ra loạt bài này chính vì thế mình sẽ không liệt kê hết kiến thức lý thuyết 1 lúc mà mình sẽ vừa notes lý thuyết vừa đi vào thực hành luôn để các bạn dễ đọc dễ hiểu,  lan man quá vào việc thôi.

 

Đối tượng nên đọc

  1. Mới tìm hiểu về kafka
  2. Đã có nền tảng về java

Kafka là gì.?

Về định nghĩa Kafka trên mạng có khá nhiều bài viết định nghĩa về Kafka vì thế ở đây mình xin tóm tắt 1 cách dễ hiểu thì 

Kafka đơn giản chỉ là một hệ thống gửi nhận message mục đích chính hay sử dụng là để giao tiếp giữa các hệ thống phân tán với nhau.

Các thành phần cơ bản của Kafka

Kafka topics: Là một kênh hay hiểu đơn giản key  định danh cho việc gửi và nhận message. Vd gửi message với topic thangnotes thì bên nhận phải consumer với topic thangnotes thì mới nhận được message. Tên topic do người quản trị kafka đặt và các thông điệp gửi và nhận sẽ thông qua tên của topic đó.

Kafka Partitions: Đơn giản đây chỉ là nơi lưu trữ dữ liệu của topic trong 1 topic thì việc lưu dữ liệu message sẽ được chia ra thành nhiều partitions con, trên các partition con này sẽ lưu message và mỗi message sẽ được gắn cho 1 offsetOffset ở đây có thể hiểu là id của message. Mục đích tuân theo nguyên tắc chia để trị việc chia nhỏ dữ liệu sẽ dễ quản lý và tăng performance, hơn nữa còn nhằm mục đích tránh mất dữ liệu, tạm thời sẽ hiểu thế đã chúng ta sẽ đi chi tiết lý do ở các bài sau. 

=> Tóm lại lý thuyết sơ khai kafka đơn giản chỉ là nơi chưa message để giao tiếp giữa các hệ thống, và việc lưu trữ message sẽ được lưu ở partition và được gắn với 1 ID offset duy nhất => message được gắn liền với topic -> partition -> offset.

Thực hành

Cài đặt Kafka với docker

Đầu tiên mình sẽ sử dụng docker-compose để cài đặt -> việc sử dụng và cài đặt docker các bạn chịu khó tham khảo google nhé có rất nhiều bài viết hd cài đặt rồi vì thế mình sẽ không hướng dẫn lại nữa.

Tạo file docker-compose.yml 

version: "3"

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    container_name: zookeeper
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "2181:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"

  kafka:
    image: docker.io/bitnami/kafka:3
    container_name: kafka
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_CFG_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_CREATE_TOPICS: "thangnotes:1:1"
    depends_on:
      - zookeeper

mở cmd/terminal gõ lệnh docker-compose up -d  để cài đặt Kafka sau khi chạy xong gõ docker ps để kiểm tra hiện được như hình là đã cài đặt thành công 

Ok giờ sang spring boot tạo 1 project kafka-consumer để nhận message từ kafka các bạn có thể tạo tại  https://start.spring.io chúng ta add các Dependencies như hình

Tương tự tạo thêm 1 project kafka-producer dùng để gửi message lên kafka

import các project vào IDE (eclipse, intellij..) ở đây mình dùng intellij. trong maven dùng lệnh mvn clean install để cài đặt và tải các thư viện cần thiết 

Trong project kafka-producer

mở application.properties

#setting cho spring biết url kafka server
spring.kafka.bootstrap-servers=localhost:9092
# setting cho spring biết group-id của kafka
spring.kafka.consumer.group-id=thangnotes
# setting port khi run app
server.port=8082

 

Tạo service KafkaProducerService.java

package thangnotes.name.vn.kafkaproducer.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public Boolean sendMessage(String message) throws ExecutionException, InterruptedException, TimeoutException {
        kafkaTemplate.send("thangnotes", message).get(10000, TimeUnit.MILLISECONDS);
        return true;
    }
}

 

tạo Controller KafkaProducerController.java

package thangnotes.name.vn.kafkaproducer.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import thangnotes.name.vn.kafkaproducer.service.KafkaProducerService;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

@RestController
@RequestMapping("kafka")
public class KafkaProducerController {
    @Autowired
    KafkaProducerService kafkaProducerService;

    @GetMapping("send")
    public Boolean sendMessage(@RequestParam String message) throws ExecutionException, InterruptedException, TimeoutException {
        return kafkaProducerService.sendMessage(message);
    }
}

 

Chạy project mở postman call thử hiện ra status = 200 nghĩa là đã send message thành công

Bây giờ chúng ta sẽ config project kafka-consumer để nhận message từ kafka-producer

Trong application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=thangnotes
server.port=8081 

 

tạo KafkaConsumerService.java trong này ta sẽ viết xử lý nhận message từ kafka

package thangnotes.name.vn.kafkaconsumer.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "thangnotes")
    public void consumnerMessage(String message) {
        System.out.println("message="+message);

    }
}

Vậy là xong giờ ta cùng start 2 project lên ở kafka-producer call lại postman như trên lúc này ở kafka-consumer sẽ ra log sẽ được như hình

Vậy là đã thành công gửi và nhận message giữa các microservice trong spring. Vậy bạn có tự hỏi vậy ưu điểm của Kafka có gì so với các hệ thống khác (JMS, RabbitMQ..) và trong dự án thực thế người ta sẽ implement Kafka như thế nào ở các viết sau trong Series Kafka này nhé.

link source code: Download Source

Cảm ơn các bạn đã đọc bài.!

 

 

Related Posts

Phần 2: cài đặt zookeeper, etcd và cách triển khai Leadership Election (End)

Ở Phần 1: Tìm hiểu về các hệ thống phân tán với zookeeper và etcd chúng ta cũng đã hiểu zookeeper và etcd là gì rồi ở phần…

Phần 1: Tìm hiểu về các hệ thống phân tán với zookeeper và etcd

Khái niệm Hệ thống phân tán là một tập hợp các máy tính hoạt động cùng nhau để thực hiện cùng một tác vụ hay cung cấp…

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x