luojun123 e18674a050 kafka发送受到配置的控制 3 years ago
..
src e18674a050 kafka发送受到配置的控制 3 years ago
pom.xml 2c62a01c14 1.测试Groovy的代码删除 3 years ago
readme.md e18674a050 kafka发送受到配置的控制 3 years ago

readme.md

kafka配置

打开kafka/config下的server.properties配置文件,跟本机情况修改kafka日志路径、zookeeper连接地址: 1) log.dirs=/tmp/kafka-logs 2) zookeeper.connect=localhost:2181

kafka服务启动

先启用zookeeper服务,通过cmd定位到kafka\bin\windows目录下,通过命令行启用kafka服务: kafka-server-start.bat ....\config\server.properties

开发平台kafka参数配置

在config.properties中增加若干kafka参数配置,目前根据本机情况配置kafka连接地址,其余保持默认即可。 kafka.servers=127.0.0.1:9092

默认kafka的端口9092可能会和kapacitor冲突,更改kafka端口的方法如下

config/server.properties

# 服务端口号,默认9092
port=9093

config/connect-standalone.properties

bootstrap.servers=localhost:9093

config/connect-distributed.properties

bootstrap.servers=localhost:9093

config/producer.properties

bootstrap.servers=localhost:9093

Kafka 使用

新的Kafka用于替代原有的ActiveMQ,新的kafka模块为qhiot-kafka,其余模块使用kafka队列需要引入该模块。

  1. kafka服务启用 打开kafka/config下的server.properties配置文件,修改zookeeper连接地址:

    zookeeper.connect=localhost:2181

    之后先启用zookeeper服务,再启用kafka服务,cmd定位到kafka\bin\windows,通过命令行启用kafka:

    kafka-server-start.bat ....\config\server.properties

  2. 配置文件参数配置

    在config.properties中,根据需求修改相关参数:

    # 连接kafka,集群地址用“,”隔开
    kafka.servers=127.0.0.1:9092
    #发送失败重试次数
    kafka.producer.retries=3
    #批处理内存大小,kb
    kafka.producer.batch.size=4096
    #批处理时间,毫秒
    kafka.producer.linger=5000
    #批处理缓冲区, kb
    kafka.producer.buffer.memory=40960
    #如果为true,消费者的偏移量将在后台定期提交
    kafka.consumer.enable.auto.commit=true
    #在使用Kafka的组管理时,用于检测消费者故障的超时, 毫秒
    kafka.consumer.session.timeout=6000
    #设置自动提交周期,毫秒
    kafka.consumer.auto.commit.interval=100
    #如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常
    kafka.consumer.auto.offset.reset=latest
    #消费监听器容器并发数
    kafka.consumer.concurrency=10
    #是否启用批量消费
    kafka.consumer.batchListener=true
    #一次拉取请求的最大消息数
    kafka.consumer.maxPollRecords=500
    
  3. 消息发送

    示例代码

    //注入对象
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    //发送的通道和数据对象
    ProducerRecord<String, String> pr = new ProducerRecord<>(KafkaDestinations.TOPIC_TEST_DATA, JsonUtil.writeValueAsString(transportWarning));
    kafkaTemplate.send(pr);
    

    /

  4. 消息接收

    拥有相同的groupId的消费者,只有一个会消费(类似queue);相同的消息会被不同的groupId的消费者消费(类似topic)。示例代码:

    @Slf4j
    @Component
    public class InsightListener {
    
        @KafkaListener(topics = KafkaDestinations.TOPIC_TEST_DATA, groupId = ConsumerGroups.TOPIC_TEST_CONSUMER_GROUP1)
        public void listen1(List<ConsumerRecord> consumerRecordList) throws Exception{
            for(ConsumerRecord consumerRecord:consumerRecordList){
                log.info("kafka1的message: " + consumerRecord.value());
                TransportWarning msg = JsonUtil.fromJson(consumerRecord.value().toString(), TransportWarning.class);
                log.info("kafka1 消费 message: "+msg.getWarning());
            }
        }
    
    }
    
  5. 通道分组定义 在qhiot-kafka模块中定义通道和分组常量

依赖

本包的发送权限控制功能,依赖transport-server包