1
0

只负责连接-解析-入库-发消息的基础服务

luojun123 42438fcff3 Merge branch 'master' of http://116.62.67.134:3000/qh/qh-link-basic 3 лет назад
.mvn 6e1bb254c2 测试 3 лет назад
config c4341a250a 废除底层存储文件的做法 3 лет назад
deploy 5b0f916899 运行文件设置内存 3 лет назад
qhiot-cache fe13364067 kafka发送受到配置的控制 3 лет назад
qhiot-core e18674a050 kafka发送受到配置的控制 3 лет назад
qhiot-influxdb-core fe13364067 kafka发送受到配置的控制 3 лет назад
qhiot-kafka e18674a050 kafka发送受到配置的控制 3 лет назад
qhiot-log ed63645585 kafkaSender对象创建方式更改,原有jms MQ代码删除 3 лет назад
qhiot-main 68b6cf0ca1 add: 增加消息发送时kafka服务状态检测 3 лет назад
qhiot-package ed63645585 kafkaSender对象创建方式更改,原有jms MQ代码删除 3 лет назад
qhiot-transport-db 42438fcff3 Merge branch 'master' of http://116.62.67.134:3000/qh/qh-link-basic 3 лет назад
qhiot-transport-message 8c6cddd227 Merge branch 'master' of http://116.62.67.134:3000/qh/qh-link-basic 3 лет назад
qhiot-transport-server e18674a050 kafka发送受到配置的控制 3 лет назад
qhiot-websocket 6e1bb254c2 测试 3 лет назад
.gitignore 6e1bb254c2 测试 3 лет назад
Readme.md 872b3f1371 log4j2.xml模板初始化,配置文件外置 3 лет назад
mvnw 6e1bb254c2 测试 3 лет назад
mvnw.cmd 6e1bb254c2 测试 3 лет назад
pom.xml ed63645585 kafkaSender对象创建方式更改,原有jms MQ代码删除 3 лет назад

Readme.md

修正记录

@2021-05-20 17:15:39 将文件系统服务器移到 basic 系统中相关配置同上层配置

core:
  storePath: D:/seem_file/

@2021-05-18 17:18:13 由于日志系统log4j.xml解读yml配置比较麻烦,所以相关配置需要在log.properties中写一份

app.node=1
log2es.clusterNodes=127.0.0.1:9300
log2es.clusterName=elasticsearch
log2es.index=qhseem

@2021-05-20 17:29:48 由于消息通讯的需要,上层link需要引入同一个 qhiot-transport-message包

@2021-05-20 17:45:00 已经将流量记录作废
ChannelHandler.java 上行流量统计

public void channelRead(ChannelHandlerContext ctx, Object buf) throws Exception {
    channelUtil.upstream(ctx.channel(), context, (ByteBuf) buf);
}
    

ByteEncoder.java 下行流量统计

    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//        byte[] rawBytes = ByteBufUtil.getBytesRemaining(msg.nioBuffer());
//
//        Integer downstreamSize = ctx.channel().attr(AttrKey.DOWNSTREAM_SIZE).get();
//        if (downstreamSize == null) {
//            downstreamSize = 0;
//        }
//        downstreamSize += rawBytes.length;
//        ctx.channel().attr(AttrKey.DOWNSTREAM_SIZE).set(downstreamSize);

        out.add(msg.copy());
    }

取消相应的流量统计任务 ChannelUtil.java

public void channelInactive(){
// 保存流量记录
//    saveTrafficStream(channel, true);
}
流量重置定时任务废弃
TrafficStreamResetter

@2021-05-20 17:44:13 计划将 离线在线状态存入数据库的任务取消 @TODO 延后 原本为收到报文时立刻发消息告诉web页面,同时将在线状存人线程安全Map中,以及缓存中;

现在计划将在线状态推送到外围link,由外围link做统计、分发、存储

@2021-05-21 10:43:30 计划将原始报文转发功能废弃,转而进行一次消息推送,而后由外围link做转发 @TODO 延后

@2021-05-21 10:44:59 计划记录原始报文的PacketV2对象(序列化过),并标记解析是否成功,如解析成功则此对象不存入influxdb,如解析不成功则寻找合适时机尝试重新解析

  • influxdb 表可以按天创建 --- 定时任务,每天0点创建表,同时判断是否有表可以删除
  • influxdb 内一张表存储的PacketV2对象都在合适的时机解析成功或者多次解析失败后,删除全表
  • 表结构从简
  • 解析失败存储时间为 ${decode.error.log.time:20d},默认20d(20天)
  • 存储策略默认${decode.error.log.autogen:decode_log_policy}
字段名 数据类型 字段类型 备注
packetV2 string tag 序列化的packetV2
decoderNum int field 解析次数
time -- field --

@2021-05-21 11:41:19 告警相关状态需要发送到消息中,需要和刘培栋商议

@2021-05-21 11:45:55 HistoryDataProcessor 中,原本发送到各处的消息队列废弃,只发送一次消息到外围link,之后的逻辑也由外围link决定如何做
HistoryDataProcessor中,将数据存入influx link 层面

@2021-05-21 11:47:25 link数据库设计

  • 因为需要考虑用户自定义存储策略(时长),所以influx数据库需要按照项目划分 可设计为表名measureName monitorDataHistory_${tenant_id},库在项目创建时创建,随项目删除时删除
  • link层面的数据表将只关注link相关信息,如租户id、网关号、测点序号等、数据类型等
  • link外围在创建项目后,默认创建项目存储策略,默认存储 ??? 时长;
  • 存储策略命名为${databaseName}_${DURATION},这里DURATION格式化为 xYxMxDxH(1年1月1日1小时);
  • link外围需要有让用户随时设置存储策略的地方;
  • influxdb会自动管理存储时长问题,不需要额外写定时任务实现;
  • 策略相关信息应该存在缓存中 问题在于:
  • influxdb在分表后,如果跨表查询,需要传入多个表名 1.需要验证跨表查询可行性 跨表查询可行
  • influxdb的存储策略如果不是使用的默认策略,则需要传入策略名称进行查询,
    1.需要验证策略变更后,用未改变状态存储的数据是否还存在
    2.需要验证,策略改变后,存储时间是按照改变策略保存时长后的进行,还是按照存入数据库时的策略保存时长设定进行;
字段名 数据类型 字段类型 备注
tenantId string tag 租户id
deviceId string tag 网关id
monitoringCode string tag 测点序号id
monitorCode string tag 监测代码
value string tag 监测值
time string tag 监测时间

@2021-05-21 13:55:14 考勤数据处理未做 Tag24gDataProcessor 计划将考勤消息发送到消息队列 从Tag24gDecoder @TODO TAG24G_DATA 路径订阅,单点订阅,需要app端实现存入数据库

@2021-05-21 17:49:12 关于kafka,考虑到同一个topic,有可能会同时某个任务只消费一次,同时有其他任务需要做多次消费,所以topic和group需要着重考虑; 是否可以考虑采用app.node
topic: 设置为发送路径 groupId:

  • 如果是 queue ,则groupId起名可以和现有的JmsDestinations中的QUEUE一致
  • 如果是 topic,则groupId起名格式以现有的JmsDestinations中的 _${app.node}
  • 此基础上需要归类发送的是同一种对象的消息队列,并进行kafka替换

一个存储map的对象,内存储队列->开关状态,kafka监听是否允许推送

@2021-05-25 16:49:12 关于kafka,金工建议给与一个开关,用于控制此消息队列是否发送;需要外围link给予配置接口 @TODO

  • 给一个配置入口,用于配置是否开启kafka队列推送
  • 开辟一个kafka消息队列通知,用于更新是否开启队列推送的的状态
  • 考虑可以分为总开关和项目开关

@2021-05-25 17:57:01 考虑传输层面日志按需推送 @TODO

@2021-05-27 17:20:52 JmsLoggerListener对象的kafka实现,需要放到link外围 @TODO

@2021-05-27 17:45:25 移除原因jms注册部分

@2021-05-31 11:05:248 所有涉及到站点的消息队列实现,都需要到消费者层级实现