eKuiper 连接格式优化,支持自定义


12月, eKuiper 团队继续专注于 1.8.0 版本新功能的开发。我们重构了外部连接(source/sink) 的格式机制,更加清晰地分离了连接、格式和 Schema,同时支持了格式的自定义;受益于新的格式机制,我们大幅完善了文件源(file source)的能力,支持定时监控文件系统及各种格式的文件,并且采用流的方式消费文件系统数据;最后,我们增加了完整数据包括规则和配置的导入导出功能,支持节点的迁移。另外,我们也修复了一些问题,并发布到 1.7.x 版本中。

12月的版本发布包括:

  • v1.8.0-alpha.3:包含 1.8.0 已开发完成的新功能

  • v1.7.4:包含 bug fixes

  • v1.7.5:包含 bug fixes

连接格式优化和自定义:序列化和 Schema

eKuiper 通过 source/sink 与外部系统进行连接、读入或写出数据。以 source 为例,每种类型的 source 读取数据时都需要经过连接(connect)和序列化(serialization)两个步骤。例如,MQTT source,连接意味着遵循 MQTT 协议连接 broker,而序列化则是将读取到的数据 payload 解析成 eKuiper 内部的 map 格式。

连接和序列化

此前,连接和序列化通常在 source 内部实现,因此当用户需要解析自定义格式时,即使连接协议是 MQTT 等已支持协议,仍然需要编写完整的 source 插件。新的版本中,格式和 source 类型进一步分离,用户可以自定义格式,而各种格式可以与不同的连接类型结合使用。自定义格式的编写方法请参考格式扩展

例如,创建 MQTT 类型的数据流时可定义各种不同的 payload 格式。默认的 JSON 格式:

CREATE STREAM demo1() WITH (FORMAT="json", TYPE="mqtt", DATASOURCE="demo")

MQTT 类型的数据流使用自定义格式,此时 MQTT 的 payload 中的数据应当使用自定义的格式:

CREATE STREAM demo1() WITH (FORMAT="custom", SCHEMAID="myFormat.myMessage", TYPE="mqtt", DATASOURCE="demo")

Schema

此前 eKuiper 支持在 Create Stream 的时候指定数据结构类型等。然而该方式有几个问题:

  • 额外性能消耗。当前的 Schema 没有与数据原本的格式 Schema 关联,因此在数据解码之后,需要再额外进行一次 validation/转换;而且该过程基于反射动态完成,性能较差。例如,使用 Protobuf 等强Schema 时,经 Protobuf 解码之后的数据应当已经符合格式,不应再进行转换。

  • Schema 定义繁琐。同样无法利用数据本身格式的 Schema,而是需要额外配置。

新的版本中,Stream 定义时支持逻辑 Schema 和格式中的物理 Schema 定义。SQL 解析时,会自动合并物理 Schema 和逻辑 Schema,用于指导 SQL 的验证和优化。同时,我们也提供了 API,用于外部系统获取数据流的实际推断 Schema。

GET /streams/{streamName}/schema

格式列表

新版本中,支持的格式扩展到如下几种。部分格式包含内置的序列化;部分格式,例如 Protobuf 既可以使用内置的动态序列化方式也可以由用户提供静态序列化插件以获得更好的性能。在 Schema 支持方面,部分格式带有 Schema,其中自定义格式也可以提供 Schema 实现。

文件源

之前版本的文件源主要用于创建 Table,对流式处理的支持不够完善。新的版本中,文件源也支持作为用作流,此时通常需要设置 interval 参数以定时拉取更新。同时增加了文件夹的支持,多种文件格式的支持和更多的配置项。

新版本中支持的文件类型有:

  • json:标准的 JSON 数组格式文件。如果文件格式是行分隔的 JSON 字符串,需要用 lines 格式定义。

  • csv:支持逗号分隔的 csv 文件,以及自定义分隔符。

  • lines:以行分隔的文件。每行的解码方法可以通过流定义中的格式参数来定义。例如,对于一个行分开的 JSON 字符串,文件类型应设置为 lines,格式应设置为 JSON。

创建读取 csv 文件的数据流,语法如下:

CREATE STREAM cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file", DELIMITER=",", CONF_KEY="csv"

数据导入导出

新版本中提供了 REST API 和 CLI 接口,用于导入导出当前 eKuiper 实例中的所有配置(流、表、规则、插件、源配置、动作配置、模式)。这样可以快速地备份配置或者移植配置到新的 eKuiper 实例中。导入导出的规则集为文本的 JSON 格式,可读性较强,也可以手工编辑。

导出配置的 rest 接口如下,通过此 API 可导出当前节点的所有配置

GET /data/export

导出配置的 rest 接口如下,通过此 API 可导入已有配置至目标 eKuiper 实例中

POST /data/import

如果导入的配置中包含插件 (native)、静态模式(static schema)的更新,则需要调用以下接口

POST /data/import?stop=1

导入配置的状态统计可用以下接口查看

GET /data/import/status

即将到来

本月我们将继续进行 1.8.0 版本其他功能的开发,并重构文档,同时推进 Flow Editor 整合到 eKuiper manager 中。敬请期待。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/ekuiper-newsletter-202212

 


相關推薦

2023-03-02

的新建和编辑。 更灵活的数据传输配置: 重构了外部连接 source/sink 的格式和序列化实现,解耦了格式和传输协议,并支持更多的格式如 csv 和自定义格式。 完整功能列表,请查看 Release Note。 同时,产品团队也重构了文

2023-04-08

r 的性能和可用性: IO Connectors 增强。新增了多 Neuron 连接的功能;在 HTTP 连接方面,我们提供了类 oAuth 的基于动态 token 的鉴权过程的支持;此外还支持了 MQTT 连接中压缩和解压,减少边云传输的带宽损耗。 分析能力增

2023-07-23

的积累; 在北向路由上,将边缘测点信息转换成blink格式的mqtt消息,支持直接对接百度天工物联网设备管理平台; 进一步扩展和优化云存储在底层的实现,提供对k8s crds之外的数据库存储的支持。还会加强与K8s云原生的集

2023-03-08

本,主要目标是实现与工业协议网关软件 Neuron 的多实例连接。目前主要完成了功能调研和规划工作,以及新功能 Python 插件虚拟环境支持的开发。 此外,2 月还发布了 1.8.1 版本,包含导入 Portable 插件以及 Flow Editor 等 bug 修复

2023-04-08

复杂的 TLS 协议。相较于 TLS 1/1.1/1.2,1.3 版本具备更快的连接协商速度,新的密钥协商机制 PSK 和更安全的加密哈希算法。 QUIC 在功能层面等价于 TCP+TLS, 并且已采用最新的 TLS 1.3 代替其原有加密协议(QUIC Crypto)。QUIC 协议默认基于

2023-03-30

提升了 SeaTunnel Zeta 稳定性,并提升了 CI/CD 的稳定性;在连接器层面上,新版本实现了 7+ 个新连接器,修复了已有常用连接器 bug,并提高了安全性。社区重构了多个底层基类,增加了一个重要特性 AI Compatible,经过优化后的API,

2023-11-06

速实现Extension功能,同时也提供界面定制能力。 提供连接和断连ExtensionAbility的能力。 支持对UIExtensionAbility页面设置背景色。 支持对UIExtensionAbility页面隐私属性,设置了隐私的页面不能被录屏或截屏。 支持通过UIExte

2022-12-16

集设备数据的安全。 Neuron 官方文档中新增加关于 DTU 连接示例的文档,感兴趣的用户可参考:官方文档。 新驱动增强接入能力 新版本增加了三个协议驱动,使得 Neuron 的工业接入能力变得更加强大。 CIP Ethernet/IP EtherNet/

2023-03-08

的通用标准。Neuron IEC61850 驱动实现了该标准中 MMS 协议的连接和读写操作。MMS 中的多种数据类型也已经映射到 Neuron 类型中,现在可以通过指定 IED(智能电子设备)中的 DA(对象属性)地址和类型,完成数据的获取和修改操作

2023-07-13

校验错误 [修复] OSS 服务缺少必要配置,导致对象存储连接失败问题。 [修复] 修复系统初始化脚本,默认缺少对象存储相关权限数据问题。 [修复] 修复工作流服务 bootstrap 配置,解决工作流服务配置与当前环境不匹配问题

2022-03-07

关联问题)脚本任务执行已修复(对于没有默认数据库的连接) H2: 添加了 2.x 版的驱动程序配置添加了检查约束支持添加了唯一键支持 甲骨文: 元数据读取性能显着提升区分大小写的导航过滤器支持已修复 MySQL:

2023-02-10

协议转换、DNS 防污染智能代理、前置 CDN/Nginx 反代、代理连接重定向、API 动态调用上级代理、限速限连接数。提供全平台的命令行版本,友好易用的 Windows&Linux&macOS 控制面板,强大的安卓版。 更新内容 1、tcp代理新增tcps

2023-04-01

置方式变更为通过数据表进行管理。 优化多租户数据库连接池、数据库事务支持。 【其它更新】 [新增] 新增前端工程 Authorization Code + PKCE 支持工具代码。 [新增] 增加 Spring 默认仓库,解决未使用腾讯镜像仓库无法

2023-10-30

,WebSocket 抛出“你的主机中的软件中止了一个已建立的连接错误” fix: #I8BIMU [修复] 修复以 spring-boot-starter-parent 作为 Maven Parent 配置,在当前环境下依赖组件版本错误不会更新问题 fix: #I8BJAB [重构] 重构 WebSocket 消息发送及多