从新手到专家:UDF for Python 使用全指南

Python
数据库
物联网
编程语言
0
0
在此前发布的 [TDengine 3.0.4.0](https://www.taosdata.com/tdengine-engineering/17930.html) 版本中,我们引入的一个重磅新功能是“使用 Python 语言编写用户自定义函数(UDF)”。这一突破性功能不仅为数据库操作提供了前所未有的灵活性,而且极大地降低了技术门槛——即使是编程初学者也能使用 Python 语言轻松地定制和操纵数据库。这些自定义函数可以像内置函数一样在 SQL 语句中灵活使用,实现个性化数据处理。本篇文章中将详细介绍如何有效利用这一功能,开启你的数据库自定义之旅。 ## UDF 基础操作 ### 创建 UDF 我们先来看下如何创建 Python 的 UDF 函数: ` CREATE [OR REPLACE] [AGGREGATE] FUNCTION function_name as library_path OUTPUTTYPE output_type [BUFSIZE buffer_size] [LANGUAGE 'C‘ | ’Python']` **选项说明:** 1. CREATE [OR REPLACE]:第一次全新创建使用 CREATE ,已经创建要更新代码可以加上 REPLACE 2. AGGREGATE:可选项,加上此选项表示创建的是聚合函数,不加是投影函数 3. function_name:自定义函数名称,创建完后在 SQL 语句中使用的名称,最大为 64 字节,超出部分会截断 4. OUTPUTTYPE:自定义函数输出数据类型,支持类型如下: ![11.jpg](https://dev-media.amazoncloud.cn/70531233890a4266a9728bf9d2ed6d88_11.jpg "11.jpg") 5. BUFSIZE:设置自定义函数可以使用的内存缓存大小 ,此选项仅在使用 AGGREGATE 时才有效,也就是说只有创建聚合函数才能使用此选项。最大为 256k,计数单位为 byte。分配的缓存是留给 Python 自定义函数使用的,缓存生命周期是从调用聚合函数 start 开始到调用 finish() 结束的整个聚合计算过程,所以可以当做全局变量来使用。 6. LANGUAGE 是创建自定义函数的语言,目前 TDengine 支持 Python 和 C 两种。 ### 删除 UDF DROP FUNCTION function_name; 是删除指定名称的 UDF 函数。function_name 参数的含义与 CREATE 指令中的 function_name 参数一致,即要删除函数的名称。 ### 查看 UDF 使用 SHOW FUNCTIONS; 可以简单查看下 UDF 创建的函数名。使用 select * from information_schema.ins_functions; 可以详细查看 UDF 创建时候的各个参数,加 \G 可以以竖列查看到完整内容,如:select * from information_schema.ins_functions\G; ## 安装环境 接下来带你安装 Python UDF 的开发环境,安装过程比较简单。 首先,准备安装环境 1. CMAKE:最低版本要求 3.0.2 2. GCC:因为需要在本机编译支持 Python UDF 函数的 so 文件,所以需要安装 GCC 环境,GCC 版本最低要求 7.5 以上。 3. Python:要求 3.7 及以上版本 然后开始安装插件 Python3 -m pip install taospyudf,安装成功后,执行 ldconfig。这样,开发环境就已经安装就绪了。 ## 编写 UDF 函数 按上面步骤环境准备好后,我们便可以开始编写自己的 UDF 函数了。UDF 函数分两大类,一类是投影函数,另一类是聚合函数。这两类函数创建及编写都是完全不相同的,所以这里分开介绍。 在介绍函数前,我们先来了解下自定义函数的调用流程,如下图: ![1.png](https://dev-media.amazoncloud.cn/52fe8bb42a184892aedc41fc761aa7c2_1.png "1.png") 首先 UDF 处理框架是以数据块为处理单位,每次调用到自定义函数中时输入数据都是一个数据块,通过调用数据块对象的 data(row,col) 方法输入行号和列号可以取到数据块中任何一个位置上的数据。这样做的目的是减少 C 框架与 Python 语言之间的调用次数,提升性能。 返回数据时,投影函数原来有多少行,就需要返回相同的行数,聚合函数只需要返回一行即可。下面详细进行介绍: ### 投影函数 投影函数就像它的名字一样,像是一个投影,输出数据的行数与输入数据的行数需保持相同,如果不相同会报错。下面用一个完整的例子来说明,我们来实现一个 TDengine 内置的 concat 字符串连接函数,如下: #### 1) 编写函数 ![2.png](https://dev-media.amazoncloud.cn/7aa98d6f9feb4946ac7e7e5cc619b358_2.png "2.png") **函数说明:** - init - 在 UDF 模块初始化的时候调用一次,可以做一些初始化的工作 - destroy - 在 UDF 模块退出的时候调用一次,可以做一些退出的工作 - process - 每个数据块到来后要调用的数据处理函数,调用 shape() 方法返回数据块行及列数 - nrows 返回数据块拥有的行数 - ncols 返回数据块拥有的列数,列数实际是 concat() 函数的参数个数 **返回值:** - 投影函数的返回对象必须是一个列表,非列表对象会直接报错 - 列表对象中元素的个数应该与块行数 nrows 相同,否则也会报错 #### 2) 创建函数 编写好自定义函数后,我们就可以直接在 taos - shell 中创建了,输入如下:create function py_concat as '/home/py_concat.py' outputtype varchar(256) language 'Python'; 创建函数名 py_concat ,创建 Python 文件位置在 /home/py_concat.py,输出数据类型为 varchar,长度 256 字节,语言为 Python。 #### 3)执行函数 上步操作成功后,即可像内置函数一样在 SQL 中任意使用,如 taos-shell 中输入 select sf_concat(factory_name,room_name), concat(factory_name,room_name) from devices; grade_name class_name 均为 varchar 数据类型 把工厂和车间名连接成一个字符串返回,可以对比 UDF 输出和内置函数输出结果,预期是相同的。 ### 聚合函数 聚合函数是把数据进行聚合计算,最后只输出一行聚合结果即可。这里我们用一个大家最熟悉的统计个数 count 实例来讲解: #### 1) 编写函数 ![3.png](https://dev-media.amazoncloud.cn/06c192d5d0a145c7a1c69df762038b4a_3.png "3.png") **实现原理:** 1. 计数的累加值在 start 初始化回调的时候把 0 值保存进 buf 中做为初始值 2. 在 reduct 函数中,如果不为 None 就不断累加,在 reduce 返回值会被存储在 buf 中,下次回调 reduce 时再作为参数 buf 传过来,这样可以反复使用 buf 3. 最后在 finish 函数中,buf 也会通过参数传进来,把自己前面存储在 buf 中的值取出来,作为返回值返回,即为最终 count 结果。 **函数说明:** - init - 在 UDF 模块初始化的时候调用一次,可以做一些初始化的工作 - destroy - 在 UDF 模块退出的时候调用一次,可以做一些退出的工作 - start. - 开始进行聚合函数计算时调用一次,主要完成对聚合函数使用的缓存进行初始化 - reduce - 每个数据块到来后要调用的数据处理函数, 调用 shape() 方法返回数据块行及列数 - rows 返回数据块拥有的行数 - cols 返回数据块拥有的列数,列数是传入 UDF 函数的参数个数 此函数会被循环调用 - finish - 计算最终聚合结果,此函数只在最后调用一次,在此函数中返回最终结果 **返回值:** 返回的数据类型为创建 UDF 函数时指定的 OUTPUTTYPE 数据类型,返回类型不正确会报错,允许 None 对象返回。 #### 2) 创建函数 编写好自定义函数后,我们就可以直接在 taos - shell 中创建了,输入如下:create aggregate function af_count as ''/home/af_count.py'' outputtype bigint bufsize 4096 language 'Python'; 创建函数名 af_count ,创建 Python 文件位置在 /home/af_count.py,输出数据类型为 bigint,语言为 Python。 #### 3)执行函数 上步操作成功后,即可像内置函数一样在 SQL 中任意使用,如 taos-shell 中输入select af_count(col1) from devices; 。这样你就拥有了自己的 count 统计函数,想要统计什么,完全由你自己来决定。 ### 数据类型映射关系 Python 语言与 C 语言交互,最重要的就是数据类型如何转化的问题。也是 Python UDF 函数编写最容易出错的地方,所以这里要重点介绍下: 首先我们看下映射关系表: ![12.jpg](https://dev-media.amazoncloud.cn/60bebb70ca764b48bdaca83088bc4adf_12.jpg "12.jpg") 1. int 类型在 Python3 中没有大小限制 2. binary / nchar / varchar 类型都映射为了 Python 的 bytes 对象,所以在使用的时候要加以区别——varchar 数据类型是 binary 的别名。 因为 binary 和 nchar 都映射为了相同的 bytes 数据类型,所以自定义函数的开发者自己约定输入自定义函数参数的类型,不同数据类型需要不同的转化方式: - binary 类型转 str 对象, 使用 bytes.decode('utf-8') - nchar 类型转 str 对象, 使用 bytes.decode('utf_32_le') 当把 str 对象内容输出为 OUTPUTTYPE 指定的不同类型时,也需要进行区分: - str 对象当 binary 类型返回,str.encode('utf-8') - str 对象当 nchar 类型返回,str.encode('utf_32_le') ## 开发技巧 ### 更新函数代码 由于我们在开发 UDF 函数时需要频繁修改代码再调试运行,因此需要知道如何让新修改的代码生效。 ![4.png](https://dev-media.amazoncloud.cn/8dbe440369b748dda912cd357b6012b1_4.png "4.png") 从上面流程图中我们可以看到,在创建 UDF 函数时指定的 .py 文件路径,只在创建的那一时刻使用,把文件内容读取出来存放到 mnode 中,便于在集群的任何节点上都可以使用。所以 .py 文件创建完 UDF 后就不再使用了,我们更新了 .py 文件中的代码后,需要再把 .py文件中的代码更新到 MNODE 中才可以起作用。 目前更新 Python 自定义函数的代码提供了直接更新的命令: ![5.png](https://dev-media.amazoncloud.cn/085d66ae039b418683cbb9d9c44990f0_5.png "5.png") 增加 OR REPLACE 即可直接把 library_path 指向的 Python 自定义函数的内容更新到 MNODE 中,再次调用就会使用更新后的代码了。 ### 输出日志 TDengine 的 Python UDF 不支持 Python 代码的调试,但支持了日志输出,如大家常用的 logging 库,都可以在 UDF 函数中使用,建议日志输出到文件中查看。print 函数打印的信息是看不到的,所以就不要用此函数输出信息了。如: ![6.png](https://dev-media.amazoncloud.cn/99e2bf58bb1f4245ad1bb32e639fd0d9_6.png "6.png") ### 抛出异常 如果开发者在 UDF 函数在检测到异常数据后,需要终止查询,可以通过 raise 方式抛出异常,同时要保证自己 raise 的异常不会被自己捕获,因为自己捕获了框架就捕不到了,所以要位于最上层抛出异常。 抛出的异常会被 UDF 框架捕获到并终止当前查询,同时在应用端调用的查询接口中会相应的返回专属的 UDF 函数抛出异常错误码:0x8000290D,应用程序可根据此错误码做应用层的相应处理。 ![7.png](https://dev-media.amazoncloud.cn/fa4b84235e8f451bab5d0e1a7686d045_7.png "7.png") 抛出的错误可以在 taos.cfg 中配置的日志文件的目录下找到,异常日志输出到了taosudfpy.log 中了,如下是一实例输出的异常日志: ![8.png](https://dev-media.amazoncloud.cn/47ef491027474258a370de8ccfb57c2f_8.png "8.png") 异常日志的内容及发生异常的文件名及行号都在此日志文件中。 ### 查看 UDF 框架生成的日志 UDF 框架返回错误描述时只按大数返回,所以详细的错误原因还需要开发者通过日志来查看。相关日志主要有两个,都在 TDengine 日志目录下存放,这里分别介绍下: **1)taospyudf.log(UDF for Python 的日志文件)** 日志文件中记录的是进程 udfd 加载 Python UDF 函数,执行 UDF 函数过程中发生的异常、错误及调用过程等的记录。我们开发 pythyon udf 函数主要看这个日志文件就可以了。 **2)udfdlog.0 ** 这个是 udfd 进程的框架日志,里面包括调用 C 及 Python 等多种语言的UDF 函数在框架中出的错的日志,都会记录在此文件中,所以这个日志文件如果整个大框架出问题了,日志会记录在这里,一般情况下不用看。 #### 常见错误码说明 以下错误码及错误描述是在开发 UDF 函数时经常会遇到的,这里做下说明: ![13.jpg](https://dev-media.amazoncloud.cn/23039efe3ced4c5d9ecc169d6f2f5534_13.jpg "13.jpg") 其中 10 和 12 是开发 Python UDF 最容易遇到的错误,详细的原因需要查看日志 taospyudf.log 。 #### 参考实例 在 TDengine 的开源仓库中有几个 UDF for Python 的测试实例,可供大家参考: https://github.com/taosdata/TDengine/tree/3.0/tests/system-test/0-others/udfpy 实例被如下 CI 测试用例使用: https://github.com/taosdata/TDengine/blob/3.0/tests/system-test/0-others/udfpy_main.py 运行测试用例: 1. 正确安装 Python3 2. 拉 TDengine 社区版代码下来 3. 进入 TDengine/tests/system-test/ 目录 4. 运行 Python3 test.py -f others/udfpy_main.py ## 注意事项 ### UDF 投影函数最容易在返回行数上出错 UDF 投影函数要求返回行数与输入行数相同,在处理时如果程序稍复杂一些就容易犯此错误,开发过程中一定要小心,不要漏行了。 ### OUTPUTTYPE 输出类型不匹配错 UDF 函数要求最终返回的数据类型一定要和创建函数时指定的 OUTPUTTYPE 类型一致,如果不一致就会报错。所以这里也是非常容易出错的地方。 投影函数的返回值在 process 函数中,必须要求是 list 对象,list对象中放的一定都是 OUTPUTTYP 映射匹配的类型对象。 聚合函数的最后返回值在 finish 函数中,聚合函数中返回一个值,这个值一定是创建函数时指定的 OUTPUTTYPE 映射匹配的 Python 类型值。 ### 一直报无法加载 libtaospyudf.so 文件 按照上面安装步骤已经安装了 taospyudf 插件,但还是报 libtaospyudf.so 无法加载的错,如下: ![9.png](https://dev-media.amazoncloud.cn/76fee61b42074b6e885032f49b377f03_9.png "9.png") 这个库一般会安装在 /usr/local/lib/libtaospyudf.so 目录下及 Python 的插件目录下,lib 目录是正式使用的,插件目录是安装时产生的,或者使用 find / -name 'libtaospyudf.so' 查询在本地的具体位置。 如果文件没有或不存在,那可能插件安装有问题,需要重新卸载安装插件如果文件存在,还是报这个错,那使用 ldd 查看 so 文件依赖的Python 库文件是否能够找到,如下是典型的 Python 安装的有问题: ![10.png](https://dev-media.amazoncloud.cn/9a3e9a4d900b44638f048f23ccf718b8_10.png "10.png") 如上错误,重新正确安装 Python3.9 可解决此问题。 ## 结语 目前我们推出的是 UDF for Python 的初版 1.0,该版本的某些功能仍在持续完善中。我们诚邀大家积极使用这一功能,定义完全属于自己的数据库。同时,我们也希望大家在使用过程中多提宝贵意见,以便我们共同改进并增强这一功能,让这个能解决特殊需求的实用功能越来越便捷、强大。
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭