Flink 最佳实践:TDSQL Connector 的使用(上)
tyb编辑 2022-05-16 10:23 21

作者:姚琦,腾讯 CSIG 工程师

本文介绍了如何在 Oceanus 平台使用 tdsql-subscribe-connector [1] ,从 TDSQL-MySQL 订阅任务 [2] 创建,到 Oceanus 作业创建、最终数据验证,实现全流程的操作指导。需要注意的是,本文默认已经创建 TDSQL-MySQL 实例和 Oceanus 集群,并且二者在同一 VPC 下或者不同 VPC 下但网络已经打通。

上述流程图简要说明了使用 tdsql-subscribe-connector 时,整个数据流向情况。TDSQL 的 binlog 数据,会通过订阅任务发送到 Kafka(这里的 Kafka 已经包含在订阅任务中,无需重新创建实例),然后 Oceanus 可以通过 tdsql-subscribe-connector 接入 Kafka 的数据,由于 Kafka 中的消息格式比较特殊,无法用常规 Kafka Connector 接入。

创建订阅任务

创建订阅任务可以参考 数据传输服务 TDSQL MySQL 数据订阅 [3] ,在订阅任务创建过程中,需要选择订阅的对象,可以选择不同数据库下的不同表,或者同一数据库下的不同表,当订阅多个表的 binlog 时,多个表中的任意一个的数据变更都会发送到 Kafka ,前提是多个表的 Schema 信息必须是相同的。

例如,以下订阅任务中,就指定了同一个库下的多张表:

创建 Oceanus SQL 作业

创建 SQL 作业

目前 tdsql-subscribe-connector 仅支持在 SQL 作业中使用,JAR 作业暂时不支持;

流计算 Oceanus 控制台 [4] 的作业管理 > 新建作业中新建 SQL 作业,选择在新建的集群中新建作业。然后在作业的开发调试 > 作业参数中添加必要的 connector,tdsql-subscribe-connector 目前需要手动上传到依赖管理中,然后在作业参数里引用该 JAR 包,Connector 的 JAR 包文件可以联系腾讯云 Oceanus 团队获取;

创建 Source 端

CREATE TABLE `DataInput` (     `id` INT,     `name` VARCHAR) WITH (   'connector' = 'tdsql-subscribe',   -- 注意选择对应的内置 Connector   'topic' = 'topic-subs-xxx-tdsqlshard-xxx',  -- 替换为订阅任务消费的 Topic   'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种   'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:321',  -- 替换为您的订阅任务 Kafka 连接地址   'properties.group.id' = 'consumer-grp-subs-xxx-group_2',   'format' = 'protobuf', -- 只能是protobuf格式   'properties.security.protocol'='SASL_PLAINTEXT', -- 认证协议   'properties.sasl.mechanism'='SCRAM-SHA-512', -- 认证方式   'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx!";' --用户名和密码);

正常情况下,以上的 Source 端参数,除了字段定义外,WITH 参数中需要根据具体订阅任务填写;这里列出 Source 端的相关配置项在订阅任务的具体位置:

  • topic [数据订阅] > [查看订阅详情] > [订阅信息]
  • properties.bootstrap.servers [数据订阅] > [查看订阅详情] > [订阅信息]
  • properties.group.id [数据订阅] > [查看订阅详情] > [消费管理]
  • properties.sasl.jaas.config 只需要替换 username 和 password [数据订阅] > [查看订阅详情] > [消费管理]

创建 Sink 端

-- Logger Sink 可以将输出数据打印到 TaskManager 的日志中  -- 程序包下载地址:https://github.com/tencentyun/flink-hello-world/releases  -- 需要先在【程序包管理】中上传该程序包,然后在【作业参数】中引用它  -- 参见 https://cloud.tencent.com/document/product/849/58713CREATE TABLE logger_sink_table (  id INT PRIMARY KEY NOT ENFORCED,    name STRING) WITH (   'connector' = 'logger',   'print-identifier' = 'DebugData');

为了验证方便,这里 Sink 端采用了 Logger ,可以把数据打印到日志文件中,在使用 Logger Connector 前,同样需要下载相关的 JAR ,上传到依赖管理,然后在作业参数中引用;

同时,为了更好地验证日志中数据打印情况,推荐使用 CLS ,可以更方便地在作业控制台查看作业运行日志;

算子操作

INSERT INTO logger_sink_table SELECT * FROM DataInput;

最后,把 Source 端数据插入到 Sink 端;

结果验证

完成 SQL 作业开发后,发布草稿 > 运行作业 ,然后可以在 Source 表中修改或者新增一些数据:

UPDATE `source_table11` SET `name`='test' WHERE `id`=300001;INSERT INTO `source_table11` (`id`, `name`) VALUES (6000000, 'test');DELETE FROM source_table11 WHERE id = 6000000

观察 taskmanager 的日志,可以看到 logger 打印出对应的 RowData 信息:

DebugData-toString: +U(300001,test)DebugData-toString: +I(6000000,test)DebugData-toString: -D(6000000,test)

注意事项

  1. TDSQL-MySQL 和 Oceanus 的 VPC 需要连通或者使用同一 VPC;
  2. 使用 tdsql-subscribe-connector 前,需要构建数据订阅任务;
  3. tdsql-subscribe-connector 目前只支持增量阶段,没有全量阶段;
  4. 当订阅任务指定了多个表时,多个表的 Schema 需要保持一致;

参考链接

[1] tdsql-subscribe-connector: https://cloud.tencent.com/document/product/849/71448

[2] 订阅任务:https://cloud.tencent.com/document/product/571/68060

[3] 数据传输服务 TDSQL MySQL 数据订阅: https://cloud.tencent.com/document/product/571/68060

[4] 流计算 Oceanus 控制台: https://console.cloud.tencent.com/oceanus

[5] Logger: https://cloud.tencent.com/document/product/849/58713

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

TDSQL动态
在“国产数据库硬核技术沙龙-TDSQL-A技术揭秘”系列分享中,5位腾讯云技术大咖分别从整体技术架构、列式存储及相关执行优化、集群数据交互总线、分布式执行框架以及向量化执行引擎等多方面对TDSQL-A进行了深入解读。 在本系列分享的最后一期,我们整理了关于TDSQL-A大家最关心的十个问题,腾讯云技术大咖们将对这些问题一一解答。
在“国产数据库硬核技术沙龙-TDSQL-A技术揭秘”系列分享中,5位腾讯云技术大咖分别从整体技术架构、列式存储及相关执行优化、集群数据交互总线、分布式执行框架以及向量化执行引擎等多方面对TDSQL-A进行了深入解读。 在本系列分享的最后一期,我们整理了关于TDSQL-A大家最关心的十个问题,腾讯云技术大咖们将对这些问题一一解答。
随着新一轮科技革命浪潮的推进,数据规模呈现爆发式的增长,数据类型愈发丰富,数据应用也在快速深化。值此背景下,数据库的发展呈现出“云原生、国产化、开源共建”三大趋势。
金融行业自身受到了互联网行业的冲击。例如目前中国网民的两大节日:阿里巴巴的“11.11剁手节”,微信的“春节红包节”,给银行的支付系统,尤其是快捷支付系统,带来了很大冲击,原有的IOE架构面领比较大的挑战。此外,金融行业内部也开始思考,如何通过云计算、大数据等技术,提供更为普惠的金融服务,提升金融行业的服务效率。这些都使得基于TDSQL的分布式互联网架构来代替集中式的IOE架构成为可能。
TDSQL又双叒叕获奖啦!在2021年IT168对外公布的“数据库·数据风云奖”评选中,经由行业技术专家及IT媒体多方联合评审,腾讯云企业级分布式数据库TDSQL凭借其出众的产品实力斩获‘‘2021年度技术卓越奖’’,这是继本月获得‘‘2021PostgreSQL中国最佳服务商’’‘‘2021PostgreSQL中国最佳数据库产品’’之后再次获奖。
TDSQL-C采用计算和存储分离的架构,所有计算节点共享一份数据,存储容量高达128TB,单库最高可扩展至16节点,提供秒级的配置升降级、秒级的故障恢复和数据备份容灾服务。TDSQL-C既融合了商业数据库稳定可靠、高性能、可扩展的特征,又具有开源云数据库简单开放、自我迭代的优势。
给编码者带来良性的社交压力。你正写一个比较紧急的需求,团队对代码的单元测试有一个要求:凡是新增的代码,必须有完整的单元测试以及需要达到一定覆盖率。此时你是否会有这样的想法,为了应付测试工具的覆盖率要求,先写一点不那么有用但是能带来覆盖率的测试。但是,一旦想到你的代码将会有你的同事参与review,有没有为刚才的这种想法产生一丝丝压力?这种压力是良性的,会阻止你去选择这些隐患很大的“投机”行为。
5月29日, DataFunSummit——多维分析架构峰会“HTAP 引擎论坛”如约而至,本论坛由腾讯云数据库技术总监李跃森老师出品。同时,论坛上,腾讯云数据库高级工程师陈再妮带来了主题为“TDSQL在HTAP领的探索与实践”的演讲分享,以下为分享回顾。