|
3 years ago | |
---|---|---|
.. | ||
src | 3 years ago | |
pom.xml | 3 years ago | |
readme.md | 3 years ago |
打开kafka/config下的server.properties配置文件,跟本机情况修改kafka日志路径、zookeeper连接地址: 1) log.dirs=/tmp/kafka-logs 2) zookeeper.connect=localhost:2181
先启用zookeeper服务,通过cmd定位到kafka\bin\windows目录下,通过命令行启用kafka服务: kafka-server-start.bat ....\config\server.properties
在config.properties中增加若干kafka参数配置,目前根据本机情况配置kafka连接地址,其余保持默认即可。 kafka.servers=127.0.0.1:9092
默认kafka的端口9092可能会和kapacitor冲突,更改kafka端口的方法如下
config/server.properties
# 服务端口号,默认9092
port=9093
config/connect-standalone.properties
bootstrap.servers=localhost:9093
config/connect-distributed.properties
bootstrap.servers=localhost:9093
config/producer.properties
bootstrap.servers=localhost:9093
新的Kafka用于替代原有的ActiveMQ,新的kafka模块为qhiot-kafka,其余模块使用kafka队列需要引入该模块。
kafka服务启用 打开kafka/config下的server.properties配置文件,修改zookeeper连接地址:
zookeeper.connect=localhost:2181
之后先启用zookeeper服务,再启用kafka服务,cmd定位到kafka\bin\windows,通过命令行启用kafka:
kafka-server-start.bat ....\config\server.properties
配置文件参数配置
在config.properties中,根据需求修改相关参数:
# 连接kafka,集群地址用“,”隔开
kafka.servers=127.0.0.1:9092
#发送失败重试次数
kafka.producer.retries=3
#批处理内存大小,kb
kafka.producer.batch.size=4096
#批处理时间,毫秒
kafka.producer.linger=5000
#批处理缓冲区, kb
kafka.producer.buffer.memory=40960
#如果为true,消费者的偏移量将在后台定期提交
kafka.consumer.enable.auto.commit=true
#在使用Kafka的组管理时,用于检测消费者故障的超时, 毫秒
kafka.consumer.session.timeout=6000
#设置自动提交周期,毫秒
kafka.consumer.auto.commit.interval=100
#如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常
kafka.consumer.auto.offset.reset=latest
#消费监听器容器并发数
kafka.consumer.concurrency=10
#是否启用批量消费
kafka.consumer.batchListener=true
#一次拉取请求的最大消息数
kafka.consumer.maxPollRecords=500
消息发送
示例代码
//注入对象
@Autowired
private KafkaTemplate kafkaTemplate;
//发送的通道和数据对象
ProducerRecord<String, String> pr = new ProducerRecord<>(KafkaDestinations.TOPIC_TEST_DATA, JsonUtil.writeValueAsString(transportWarning));
kafkaTemplate.send(pr);
/
消息接收
拥有相同的groupId的消费者,只有一个会消费(类似queue);相同的消息会被不同的groupId的消费者消费(类似topic)。示例代码:
@Slf4j
@Component
public class InsightListener {
@KafkaListener(topics = KafkaDestinations.TOPIC_TEST_DATA, groupId = ConsumerGroups.TOPIC_TEST_CONSUMER_GROUP1)
public void listen1(List<ConsumerRecord> consumerRecordList) throws Exception{
for(ConsumerRecord consumerRecord:consumerRecordList){
log.info("kafka1的message: " + consumerRecord.value());
TransportWarning msg = JsonUtil.fromJson(consumerRecord.value().toString(), TransportWarning.class);
log.info("kafka1 消费 message: "+msg.getWarning());
}
}
}
通道分组定义 在qhiot-kafka模块中定义通道和分组常量
本包的发送权限控制功能,依赖transport-server包