Spring下实现kafka
原理
kafka下载
地址:http://kafka.apache.org/downloads
JDK前提
Window
下载地址:https://www.java.com/zh-CN/download/
配置环境变量:https://www.runoob.com/java/java-environment-setup.html
必须通过JAVA_HOME方式配置
Linux
命令行安装openJDK即可
启动kafka
Spring项目依赖于zookeeper和kafka的开启
Window
下载成功后解压到电脑指定目录
里面自带有zookeeper、配置文件 server.properties(地址、ip等配置信息),本地简单版本默认就行。
cmd进入到kafka解压后目录,执行以下命令。
注意启动zookeeper后不要关闭窗口,再新开一个cmd窗口启动kafka。
1 | // 启动zookeeper |
Linux
1 | # 启动zookeeper |
kafka服务与spring服务在不同机子
kafka服务所在的机子需要配置config/server.properties
1 | listeners=PLAINTEXT://IP地址:9092 |
如果不配置具体IP地址,则默认使用localhost,不在kafka所在机子的spring建立连接时就会报错误(Connection to node 1 (localhost/127.0.0.1:9092) could not be established),因为kafka在zookeeper中注册时使用的是localhost。
Gradle
1 | // https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka |
resources下application.yml
1 | # kafka配置 |
kafka包下KafkaProducer
1 | import org.slf4j.Logger; |
kafka包下KafkaConsumer
1 | import org.slf4j.Logger; |
KafkaController
1 | import org.springframework.beans.factory.annotation.Autowired; |
Entity
1 | import lombok.AllArgsConstructor; |
公共类JsonUtils
1 | import com.fasterxml.jackson.core.JsonProcessingException; |
应用
带参请求controller,发布主题topic的消息。
KafkaConsumer组件监听主题topic消息,获取数据执行相应的业务操作。KafkaConsumer监听所有前缀为 “topic.” 的主题消息。