想要流畅体验 TDengine 3.0 数据订阅功能?要点都在这里

数据库
0
0
<!--StartFragment--> 众所周知,在 [TDengine](https://www.taosdata.com/ "TDengine") 3.0 中,我们对数据订阅功能进行了全面升级,以便大家可以更加便捷地实时订阅和获取数据的更新,完成实时监控、数据分析和有效报警等工作。在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。 本文将从 Java Developer 的视角来介绍如何使用 TDengine 3.0 的数据订阅功能。 TDengine 3.0 的版本迭代很快,可能有些配置参数或细节在之后的版本会发生变化,本文对应 [TDengine 版本为 3.0.3.0](https://www.taosdata.com/news/17012.html)。 ## 写在前面 在官方文档里已经有介绍,TDengine 的数据订阅是什么以及如何使用,有需要的朋友可以通过下方链接进入官网查看相关介绍: 1. 开发指南:<https://docs.tdengine.com/taos-sql/> 2. SQL:<https://docs.tdengine.com/taos-sql/tmq/> 3. Java 使用数据订阅:<https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85> 总结一下,我理解的数据订阅功能是以“订阅”的方式获取存在于 TDengine 中的数据。一般情况下,“订阅”意味着的业务需求是订阅数据库中的最新数据。“订阅”的流程很简单:(1)在数据库中创建 topic;(2)在应用中消费 topic 的数据。 ## 基本操作:创建 在数据库中创建 topic,使用 SQL 语句 create topic 即可。create topic 这个 SQL 如何写,实际上定义了 topic 对应的数据粒度,包括哪些数据库、超级表、子表、列、行。值得一提的是,TDengine 的 SQL 支持订阅 database、 supertable、subquery 这 3 种模式。`CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;` 这种 SQL 可以直接订阅整个 database;`CREATE TOPIC topic_name AS STABLE stb_name` 这种 SQL 可以订阅某个超级表;订阅子查询是最普遍的场景。例如: ``` CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津'); ``` 上面这个 SQL,订阅了 testdb 数据库中的 meters 超级表,通过 where 子句过滤满足以下条件:location(tag 列)为“北京”或“天津”的子表,且 voltage 超过 220.0 的 ts、voltage、location 的数据。 ## 黄金搭档:流式计算 + 数据订阅 以智能电表的场景为例,如果我想每 10 分钟计算一次电压的平均值,并在平均电压高于 220V 就进行上报。对于这种需求,单纯用 TDengine 的数据订阅功能是不行的,因为 create topic 的子查询不支持聚合查询。这个时候,就需要用 TDengine 的流式计算 + 数据订阅这对黄金搭档了。如下: ``` CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1 INTO stb_name AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as locationFROM testdb.meters WHERE location in ('北京', '天津') PARTITION BY location INTERVAL(10m);CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0; ``` 上面的 2 条 SQL 中,第一条 SQL 创建了一个 stream:以 location 分组,计算每 10 分钟的“北京”、“天津”的平均电压;用时间窗口的结束 \_wend 作为时间戳 ts;avg(voltage) 计算 voltage 平均值;时间窗口的最后一条 last_row(location) 作为标签。同时,这个 stream 以 WINDOW_CLOSE 作为计算窗口的触发模式,过期策略为 IGNORE EXPIRED 1。 第二条 SQL 创建了子查询订阅,用于过滤每 10 分钟平均电压高于 220V 的数据。这样我们就创建了一个可以被消费的 topic,消费到的数据为高于 220V 的 10 分钟平均电压,满足了前面所说的监控场景的需求。 ## 消费 topic:很像 Kafka 在应用中消费 topic 的数据,需要按照各种连接器的 API 来使用,具体使用方式请参考官方文档:<https://docs.taosdata.com/>。在这里,我只对 TDengine 和订阅消费 topic 的一些配置参数进行梳理。 1. 连接相关的参数,java connector 中使用 `bootstrap.servers` 一个参数代替了 `td.connect.ip` 和 `td.connect.port`,使用了和 Kafka 一样的参数名。`td.connect.user` 和 `td.connect.pass` 仍然需要设置。 2. `group.id`:和 Kafka 一样,多个线程可以共同消费同一个 topic,只要它们使用同一个 group.id。TDengine 的 vgroup 与 Kafka 的 partition 在概念上是对应的。同一个 group.id 中,一个 vgroup 最多只对应一个 consumer。如果 consumer 数量大于 vgroup 的数量,则有些 consumer 消费不到数据。 3. `auto.offset.reset`:这个参数和 Kafka 的行为不一样。如果 group.id 为新值,在设置 earliest 时,订阅从头消费数据;设置为 latest 时,从最新数据开始订阅。当 group.id 为已存在的值时,不管 `auto.offset.reset` 为何值,都会从最后一个 offset 开始,继续消费。 4. `enable.auto.commit`:建议设置为 false。开启自动提交 offset,TDengine 的 commit 自动提交机制是轮询提交。 5. `auto.commit.interval.ms`:建议不设置。如果 `enable.auto.commit` 为 true,自动提交 commit 的间隔为 `auto.commit.interval.ms` 设置的值。 6. `enable.heartbeat.background`:建议设置为 true,默认值为 true。如果设置为 false,在应用长时间不主动 poll 数据时,可能会造成当前 consumer 的离线。在 TDengine 的实现上,heartbeat 的 interval 被设置成了 1 秒。 7. `msg.with.table.name`:建议设置成 true。在订阅超级表和数据库时添加了 WITH META,应该开启这个设置。例如:订阅为 `CREATE TOPIC topic_name WITH META AS STABLE stb` 时,配置 `msg.with.table.name` 为 true,则消费时可以获取到 tableName。 ## Show U The Code 到此,本文介绍了有关 TDengine3.0 的数据订阅功能的诸多细节。我相信,上面的内容应该可以为你使用数据订阅功能提供一些思路和帮助。但是,对程序员来说,“Talk is cheap. Show me the code”。下面,我列举了一些 Java 的示例代码,供你参考。 1. **subscribeDemo-java** 这个 java 工程实现了一个最简单的订阅功能,从 TDengine 中订阅一个 topic ,并将消费到的数据写到文件中。值得一提的是,代码使用 [bytebuddy](https://bytebuddy.net/#/) 动态生成了 Java POJO 类和对应的 Deserializer 类。因此,你只需要在 schema.txt 内写好 topic 对应的字段,就可以不写代码,直接订阅不同 topic 的数据了。 链接:<https://github.com/taosdata/subscribeDemo-java> 2. **SubscribeDemo** 这个页面展示了一段最基本的数据订阅的代码。main 方法中,包括了在 TDengine 中创建数据库、表、topic 的操作,并从 topic 中消费数据进行打印。 链接:<https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java> 3. **WebsocketSubscribeDemo** 这个页面的代码和 SubscribeDemo 相比,仅有的区别是其配置了 td.connect.type 参数为 ws,即:使用 websocket 连接 taosadapter,这样的好处是不用安装客户端。 链接:<https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java> ## 结语 相信借助本篇文章,你一定能够流畅体验到 TDengine 的数据订阅功能,有需要的读者可以收藏备用。对于更为复杂的应用问题,也欢迎大家直接向社区技术支持人员寻求帮助。关于 TDengine 3.0 的更多示例代码,请参考:<https://github.com/taosdata/TDengine/tree/main/docs/examples>。 <!--EndFragment-->
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭