深入协议层:tlmqtt 如何通过自定义编解码器实现高性能 MQTT Broker


tlmqtt 是一款基于 Java 开发、底层依赖 Netty 和 Project Reactor 的完全开源免费的高性能 MQTT Broker。它提供完整的 MQTT 协议解析、QoS 0/1/2 消息支持、自定义消息存储、可扩展的认证机制以及数据桥接功能。

MQTT 编解码:深入控制与理解

在分析众多开源 MQTT Broker实现时,发现绝大多数(约 99%)都直接使用 Netty提供的编解码器,如下所示:

pipeline.addLast(MqttEncoder.INSTANCE);
pipeline.addLast(new MqttDecoder(maxBytesInMessage));

这种方式让开发者无需关注协议解析细节,专注于业务逻辑开发,是其显著优势。然而,它也带来了两个关键限制:

  • 高度依赖 Netty: 扩展性和灵活性受限于Netty的实现
  • 协议理解不足: 开发者容易停留在“知其然”层面,对 CONNECTPUBLISH等报文的具体结构和解析过程缺乏深入理解

tlmqtt 选择了自定义编解码器的实现路径,对 MQTT消息报文进行逐步解析。这为我们提供了更深入的控制、灵活性和扩展性,同时也是深入理解MQTT协议细节的实践。

Netty 基础与 MQTT 协议的挑战

Java高性能网络开发离不开 Netty。它提供了多种开箱即用的编解码器,如固定长度、分隔符和基于长度域的帧解码器。在实现自定义编解码器之前,必须理解 MQTT协议对消息长度的独特定义方式 剩余长度编码规则:

  • 单个字节可表示 0 到 127 的值。
  • 大于 127 的值处理如下:      - 每个字节的 低 7 位 (bits 0-6) 用于编码数据。     - 最高位 (bit 7) 作为标识位:1 表示还有后续字节,0 表示结束。
  • 剩余长度最多由 四个字节 表示。

显然, MQTT的这种变长编码方式与 Netty内置的标准长度域解码器(通常是固定字节数表示长度)并不完全匹配。因此,自定义编解码器成为必然选择。

解码器实现:从字节流到消息对象

自定义解码器需继承  Netty的  ByteToMessageDecoder类,核心任务是将接收到的 ByteBuf字节流转换为业务逻辑所需的 AbstractTlMessage对象(及其各种具体子类,如  TlMqttConnectReq)。 核心  decode方法流程如下:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    // 1. 检查基本长度:可读字节数小于2(固定头最小长度),等待更多数据
    if (in.readableBytes() < MIN_LENGTH) {
        return;
    }
    // 2. 标记当前读指针位置,以便后续数据不足时回退
    in.markReaderIndex();

    // 3. 读取第1字节:包含消息类型(高4位)和标志位(低4位)
    short firstByte = in.readUnsignedByte();
    // 4. 解码剩余长度 (变长编码)
    int remainingLength = decodeRemainingLength(in);

    // 5. 检查载荷数据是否完整到达 (剩余长度指的就是载荷长度)
    if (in.readableBytes() < remainingLength) {
        in.resetReaderIndex(); // 数据不足,重置读指针,等待后续数据
        return;
    }

    // 6. 数据完整:读取载荷部分到临时ByteBuf
    ByteBuf payloadBuf = in.readBytes(remainingLength);
    // 可选:打印原始报文(十六进制) - TlLog.logger("mqtt raw hex", payloadBuf);
    try {
        // 7. 提取消息类型 (右移4位取高4位)
        int messageType = firstByte >> Constant.MESSAGE_BIT;
        MqttMessageType mqttType = MqttMessageType.valueOf(messageType);

        // 8. 根据消息类型,分派给对应的具体解码器构建请求对象
        AbstractTlMessage req = switch (mqttType) {
            case CONNECT -> connectDecoder.build(payloadBuf, firstByte, remainingLength);
            case DISCONNECT -> disConnectDecoder.build(payloadBuf, firstByte, remainingLength);
            case PUBLISH -> publishDecoder.build(payloadBuf, firstByte, remainingLength);
            case PUBACK -> pubAckDecoder.build(payloadBuf, firstByte, remainingLength);
            case PUBREC -> pubRecDecoder.build(payloadBuf, firstByte, remainingLength);
            case PUBREL -> pubRelDecoder.build(payloadBuf, firstByte, remainingLength);
            case PUBCOMP -> pubCompDecoder.build(payloadBuf, firstByte, remainingLength);
            case SUBSCRIBE -> subscribeDecoder.build(payloadBuf, firstByte, remainingLength);
            case UNSUBSCRIBE -> unSubscribeDecoder.build(payloadBuf, firstByte, remainingLength);
            case PINGREQ -> heartBeatDecoder.build(payloadBuf, firstByte, remainingLength);
            default -> throw new IllegalArgumentException("Unknown MQTT message type: " + mqttType);
        };
        out.add(req); // 9. 将解析好的消息对象加入输出列表,传递给后续Handler
    } finally {
        payloadBuf.release(); // 10. 确保临时ByteBuf资源释放
    }
}

关键辅助方法:剩余长度解码 ( decodeRemainingLength)

private int decodeRemainingLength(ByteBuf in) {
    int multiplier = 1; // 乘数因子 (128^0, 128^1, ...)
    int value = 0;      // 累积计算出的剩余长度值
    byte encodedByte;
    do {
        encodedByte = in.readByte();                // 读取一个编码字节
        value += (encodedByte & 0x7F) * multiplier; // 取低7位数据并乘以当前乘数
        multiplier *= 128;                          // 乘数递增 (128^1, 128^2, ...)
    } while ((encodedByte & 0x80) != 0);           // 检查最高位(标识位)是否为1 (还有后续字节)
    return value;
}

根据解析出的消息类型,数据会被分派给对应的具体解码器(如  TlMqttConnectDecoder)。这些解码器通常采用模块化设计,包含 decodeFixedHeader(固定头)、 decodeVariableHeader(可变头)和 decodePayload(载荷)三个核心方法。 以  CONNECT报文解码 ( TlMqttConnectDecoder) 为例:

// 解码固定头 (相对简单,主要是类型和长度)
TlMqttFixedHead decodeFixedHeader(int remainingLength) {
    TlMqttFixedHead fixedHead = new TlMqttFixedHead();
    fixedHead.setMessageType(MqttMessageType.CONNECT);
    fixedHead.setLength(remainingLength); // 设置整个报文剩余长度
    return fixedHead;
}

// 解码可变头 (包含协议名、版本、连接标志和保活时间)
TlMqttConnectVariableHead decodeVariableHeader(ByteBuf buf) {
    TlMqttConnectVariableHead variableHead = new TlMqttConnectVariableHead();

    // 1. 协议名 (通常是"MQTT")
    int protocolNameLen = buf.readUnsignedShort(); // 长度域 (2字节)
    variableHead.setProtocolNameLength(protocolNameLen);
    byte[] protocolNameBytes = new byte[protocolNameLen];
    buf.readBytes(protocolNameBytes);
    String protocolName = new String(protocolNameBytes, StandardCharsets.UTF_8); // 显式指定字符集

    // 2. 协议版本 (e.g., 4 for MQTT 3.1.1)
    short protocolVersion = buf.readUnsignedByte();
    variableHead.setProtocolVersion(protocolVersion);

    // 3. 连接标志字节 (Connect Flags) - 关键!
    int connectFlags = buf.readUnsignedByte();
    // 位运算解析各个标志位
    variableHead.setReserved(connectFlags & 0x01);         // Bit 0 (保留位,必须为0)
    variableHead.setCleanSession((connectFlags >> 1) & 0x01); // Bit 1 (Clean Session)
    int willFlag = (connectFlags >> 2) & 0x01;             // Bit 2 (Will Flag)
    variableHead.setWillFlag(willFlag);
    variableHead.setWillQos((connectFlags >> 3) & 0x03);     // Bits 3-4 (Will QoS: 0, 1, 2)
    variableHead.setWillRetain((connectFlags >> 5) & 0x01);  // Bit 5 (Will Retain)
    variableHead.setPasswordFlag(((connectFlags >> 6) & 0x01) > 0); // Bit 6 (Password Flag)
    variableHead.setUsernameFlag(((connectFlags >> 7) & 0x01) > 0); // Bit 7 (Username Flag)

    // 4. 保活时间 (Keep Alive Timer - 秒)
    short keepAlive = buf.readShort();
    variableHead.setKeepAlive(keepAlive);

    log.debug("解析【CONNECT】可变头: 协议名=[{}], 版本=[{}], CleanSession=[{}], "
            + "WillFlag=[{}], WillQos=[{}], WillRetain=[{}], 用户名标志=[{}], 密码标志=[{}], KeepAlive=[{}]",
            protocolName, protocolVersion, variableHead.getCleanSession(),
            willFlag, variableHead.getWillQos(), variableHead.getWillRetain(),
            variableHead.isUsernameFlag(), variableHead.isPasswordFlag(), keepAlive);
    return variableHead;
}

// 解码载荷 (内容由可变头中的标志位决定)
TlMqttConnectPayload decodePayload(ByteBuf buf, TlMqttConnectVariableHead variableHead) {
    TlMqttConnectPayload payload = new TlMqttConnectPayload();

    // 1. Client Identifier (必选)
    int clientIdLen = buf.readUnsignedShort();
    byte[] clientIdBytes = new byte[clientIdLen];
    buf.readBytes(clientIdBytes);
    payload.setClientId(new String(clientIdBytes, StandardCharsets.UTF_8));

    // 2. Will Topic & Will Message (如果 Will Flag = 1)
    if (variableHead.getWillFlag() == 1) {
        int willTopicLen = buf.readUnsignedShort();
        byte[] willTopicBytes = new byte[willTopicLen];
        buf.readBytes(willTopicBytes);
        payload.setWillTopic(new String(willTopicBytes, StandardCharsets.UTF_8));

        int willMessageLen = buf.readUnsignedShort();
        byte[] willMessageBytes = new byte[willMessageLen];
        buf.readBytes(willMessageBytes);
        payload.setWillMessage(new String(willMessageBytes, StandardCharsets.UTF_8));
    }

    // 3. Username (如果 Username Flag = true)
    if (variableHead.isUsernameFlag()) {
        int usernameLen = buf.readUnsignedShort();
        byte[] usernameBytes = new byte[usernameLen];
        buf.readBytes(usernameBytes);
        payload.setUsername(new String(usernameBytes, StandardCharsets.UTF_8));
    }

    // 4. Password (如果 Password Flag = true)
    if (variableHead.isPasswordFlag()) { // 使用VariableHead中的标志位判断
        int passwordLen = buf.readUnsignedShort();
        byte[] passwordBytes = new byte[passwordLen];
        buf.readBytes(passwordBytes);
        payload.setPassword(new String(passwordBytes, StandardCharsets.UTF_8));
    }

    log.debug("解析【CONNECT】载荷: clientId=[{}], willFlag=[{}], willQos=[{}], willTopic=[{}], username=[{}]",
            payload.getClientId(), variableHead.getWillFlag(), variableHead.getWillQos(),
            payload.getWillTopic(), payload.getUsername());
    return payload;
}

其他 MQTT报文类型( PUBLISH,  SUBSCRIBE,  PUBACK等)的解码逻辑遵循类似模式,具体实现可参考对应的解码器类。

解码完成后,会得到一个具体的请求对象(如  TlMqttConnectReq)。该对象随后会被传递给专门处理该类型消息的 ChannelInboundHandler,例如  TlMqttConnectHandler

public class TlMqttConnectHandler extends SimpleChannelInboundHandler<TlMqttConnectReq> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TlMqttConnectReq req) throws Exception {
        // 在此处实现CONNECT请求的核心业务逻辑:
        // 1. 认证 (用户名/密码校验)
        // 2. 会话管理 (新建或复用会话)
        // 3. 遗嘱消息处理
        // 4. 构建并发送CONNACK响应
    }
}

编码器实现:从对象到网络字节流

tlmqtt 的编码器负责将业务逻辑中需要发送给客户端的消息对象(如  TlMqttConnack,  TlMqttPublish等)序列化为符合 MQTT协议规范的二进制数据。开发者只需操作这些对象即可:

// 业务逻辑中创建CONNACK响应对象
TlMqttConnack connack = TlMqttConnack.build(cleanSessionPresent, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 通过通道管理器发送
channelManager.writeAndFlush(clientId, connack);

编码器(继承  Netty的  MessageToByteEncoder)则透明地处理对象到字节流的转换。 以 CONNACK报文编码 ( TlMqttConnackEncoder) 为例:

@ChannelHandler.Sharable // 标记为可共享,通常无状态
@Slf4j // 日志注解
public class TlMqttConnackEncoder extends MessageToByteEncoder<TlMqttConnack> {

    @Override
    protected void encode(ChannelHandlerContext ctx, TlMqttConnack connack, ByteBuf out) throws Exception {
        TlMqttFixedHead fixedHead = connack.getFixedHead();
        TlMqttConnackVariableHead variableHead = connack.getVariableHead();

        // 1. 固定头编码
        byte fixedHeaderByte = (byte) (fixedHead.getMessageType().value() << 4); // 消息类型(高4位) + 保留位(低4位=0)
        out.writeByte(fixedHeaderByte);

        // 2. 剩余长度编码 (CONNACK固定为2字节)
        out.writeByte(2); // Remaining Length = 2

        // 3. 可变头编码
        out.writeByte(variableHead.getSessionPresent()); // Byte 1: Session Present Flag (0 或 1)
        out.writeByte(variableHead.getConnectReturnCode().getValue()); // Byte 2: Connect Return Code
    }
}

贡献与反馈

欢迎通过以下方式参与项目共建:

  1. 提交 Issue:反馈 Bug 或提出功能建议
  2. 提交 PR:优化代码或新增功能(建议先创建 Issue 沟通方案)
  3. Star/Fork:支持项目持续发展

联系方式:

  • 邮箱:[email protected]
  • 项目地址:https://github.com/ZHSQJM/tlmqtt
  • 项目地址:https://gitee.com/PiQiHenHaoDeGangTieXia/tlmqtt

tlmqtt致力于为物联网开发者提供轻量、高效的 MQTT 消息服务,期待您的加入! 🚀


相關推薦

2023-01-16

修复特定场景下消息订阅失效问题。 重新设计消息编解码器。使整体结构更清晰,更具扩展性。 smart-socket 升级至 1.6.1。 3、如何获取 smart-mqtt 3.1 源码 主仓库:https://gitee.com/smartboot/smart-mqtt 镜像同步:https:/

2022-09-20

一、简介 mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。mica-mqtt 更加易于集成到已有服务和二次开发,降低自研物联网平台开发成本。 二、功能 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。 支持 websocket

2023-01-06

ization)两个步骤。例如,MQTT source,连接意味着遵循 MQTT 协议连接 broker,而序列化则是将读取到的数据 payload 解析成 eKuiper 内部的 map 格式。 连接和序列化 此前,连接和序列化通常在 source 内部实现,因此当用户需要解析自定

2023-09-07

地抛弃了"一揽子解决方案"的传统范式,将注意力集中在高性能MQTT协议的需求上。我们坚信MQTT的协议特性具有更广泛的应用范式,因此BifroMQ致力于充分发挥MQTT协议的潜力,尤其是在处理大规模负载时的卓越性能。 相较于在MQTT

2023-03-09

完善 openAPI 定义,并提供部分接口实现。 完善 MQTT5 协议规范的实现。 Broker 支持节点命名,用于集群模式下区分节点的唯一性。 提供内存模式的指标统计功能。 调整消息推送服务与插件模块的初始化顺序。 MQT

2023-09-08

、JSON、XML、HEX 和 Base64 常规消息编解码,支持 SPI 扩展编解码器; 主界面截图 仓库地址: https://github.com/ptma/mqtt-insight https://gitee.com/ptma/mqtt-insight 下载地址: https://gitee.com/ptma/mqtt-insight/releases/tag/V1.0.0

2023-04-08

2 个重要功能进行了升级:MQTT over QUIC 的双向认证和 DDS 协议转换代理的序列化代码自动生成。另外还新增了 QUIC 传输层的配置参数,增加了 Retain 消息的持久化,以及发布了 NanoSDK 0.9 版本等诸多更新。 QUIC 双向认证 & 新增

2023-04-24

策略,降低了系统运行时的CPU和内存占用,使企业在保持高性能的同时,大幅降低运维成本。 smartboot 开源组织,一个容易被误认为是在 “重复造轮子” 的低调组织。曾获得 2020 年度 OSC 中国开源项目「优秀 Gitee 组织

2023-11-17

2023年7月,百度天工AIoT团队正式开源高性能分布式MQTT物联网消息中间件,并将其全新命名为BifroMQ。自 BifroMQ 首个版本发布以来,获得海量社区的广泛关注,其中,BifroMQ的集群能力一直备受期待。   通过近三个月的不懈努

2023-10-17

略,降低了系统运行时的 CPU 和内存占用,使企业在保持高性能的同时,大幅降低运维成本。 另外,mqtt 协议天然适合 IM 场景,需要打造企业级 IM 应用,smart-mqtt 同样是一个非常好的选择。 smartboot 开源组织,一个容易被误

2023-10-28

上手(前提:未曾遭受网上错误编解码知识的毒害) 高性能:以算法之力充分驱动硬件算力之势,基于smart-socket的服务在通信性能方面可轻松超过其他计算机语言开发的程序,包括且不限于:C/C++、Golang、Rust、Erlang。 推

2023-03-02

连接 source/sink 的格式和序列化实现,解耦了格式和传输协议,并支持更多的格式如 csv 和自定义格式。 完整功能列表,请查看 Release Note。 同时,产品团队也重构了文档结构,更新了安装和应用场景文档,方便用户快速找到

2023-09-26

开源的解决方案,WeMQ希望为物联网设备运营商提供一个高性能、安全可靠、功能强大的调试和管理平台,帮助他们简化设备调试流程、提高调试效率,并保证设备的稳定运行。 项目的主要特性: 提供一套完整的物联网设备

2022-11-08

调试 MQTT 服务与应用,同时也能在 MQTT 的研究与应用中更深入地理解 MQTT 协议及相关特性。 因此 MQTT X 除提供了简单高效的连接、发布和订阅等功能测试能力外,在目前正在开发的 1.9.0 版本中,还新增了一个帮助页面。该页面