本文共 4707 字,大约阅读时间需要 15 分钟。
是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎能够提供PB级存储、千万TPS以及毫秒级延迟的服务能力。
本文会给大家详细介绍表格存储重磅推出的一项新功能--全增量一体数据通道。文章会为大家阐述数据通道的主要使用场景,表格存储数据通道的功能优势,并带大家快速入门如何使用数据通道来消费数据。图-1 数据通道的主要应用场景
如图-1展示,数据通道的使用场景主要分为四大类:总之,表格存储利用其强大的分布式引擎写入能力和完备的数据通道功能,做到了让用户的数据存储和数据消费All in one! 使用表格存储,缩减了用户系统架构中的外部依赖,也避免引入数据同步和多写一致性问题,大大降低上述四大类场景的技术门槛和人力成本。
从主流传统数据库到NoSQL数据库,从开源产品到云产品,都已经有一些数据通道相关的解决方案了,表格存储数据通道从功能上与主流自建消费通道对比如下:
功能 | 表格存储通道服务 | MySQL数据同步 | Hbase replication框架 | AWS Dynamodb stream |
---|---|---|---|---|
同步功能 | 全量加增量数据同步,全量复制到增量同步无缝切换 | 分离的全量复制任务和增量同步任务,需要业务方设计切换方案 | 增量数据同步 | 增量数据同步 |
数据一致性 | 支持保序协议 | 支持保序协议 | 无保序协议保证一致性 | 支持保序协议 |
扩展性 | 数据规模水平扩展 | 单机数据库同步 | 数据规模水平扩展 | 数据规模水平扩展 |
易用性 | 多语言SDK | 需自建方案或使用开源实现 | 自建Hbase replication log监听 | 使用AWS KCL client |
运维监控 | RPO消费监控 | 需自建监控 | 无RPO监控 | 无RPO监控 |
计算对接 | 直读对接阿里云流计算(Blink) | 需导出到数仓或消息队列 | 需导出到数仓或消息队列 | 需Kinesis适配器对接 |
负载均衡 | 基于RPO自动负载均衡 | 单机数据库同步 | 无负载均衡 | KCL client负载均衡 |
不少的开源数据系统和计算引擎都实现了从MySQL到自身的数据同步方案,以solr的MySQL全量、增量同步方案为例,用户需要建立全量导入任务full import和增量同步任务delta import,并且需要根据自身业务安排两个任务的先后顺序、抑或是否需要业务停写等,同时需要自建同步延迟监控,避免同步滞后和堆积。在对接计算引擎方面,MySQL用户通常通过数据同步把数据导入到数据仓库或者消息队列,进而接入计算引擎,引入了额外的存储依赖和数据冗余,比较复杂。
Hbase上的增量数据可以通过复用Hbase replication框架实现增量数据消费,参照Lily Indexer实现,但是replication会引入离线推送和Hbase在线服务的资源竞争,也需要较高的技术门槛解决传输优化、热点问题。同时HBase的日志顺序通过数据上的时间戳决定,在时钟回退和消费超时时的日志乱序问题难以避免。总体来说,该方案的技术门槛和运维成本都很高,消费场景也需要容忍日志乱序。AWS Dynamodb stream是Dynamodb的实时数据处理解决方案,DynamoDb会为用户保存最近24小时的数据操作日志,支持用户以partition粒度并发消费,同时保证增量日志有序。Dynamodb stream不支持导出全量数据,但很多同步场景需要先处理全量的历史数据随后在开始消费后续的增量数据,另外其复用了KCL实现partition的消费租约、管理partition拓扑关系、持久化每个partition的checkpoint,逻辑很重,没有对应语言KCL client时使用难度比较大。相对于以上方案,表格存储通道服务的主要功能优势有:通道服务可以在控制台即开即用,计费上同其他数据读API一样仅按读取数据CU计费(按量、预留以及资源包),并可以在控制台管理和监控通道消费进度和状态。接下来就带大家快速体验一下通道服务的开通、数据消费、消费延迟监控和水平扩展。
图-2 创建一个全量加增量类型的通道
可以看到创建后通道的通道ID和分区信息:图-3 通道的ID和分区信息
复制通道ID,通过Java SDK实现通道服务IProcessor接口的消费函数,打印从通道读取的数据,体验从通道开始数据消费;
// 用户自定义数据消费 Callback,即实现IChannelProcessor 接口(process和shutdown)。private static class SimpleProcessor implements IChannelProcessor { @Override // ProcessRecordsInput 中包含拉取到的数据。 public void process(ProcessRecordsInput input) { System.out.println("Default record processor, would print records count"); System.out.println( String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // Mock Record Process. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); }}public static void main() throws Exception { // 1. 初始化Tunnel Client。 // endPoint 为表格存储实例 endPoint,如https://instance.cn-hangzhou.ots.aliyuncs.com。 // accessKeyId 和 accessKeySecret 分别为访问表格存储服务的 AccessKey 的 Id 和 Secret。 TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName); // 2. 填入创建好的tunnelId,开始数据消费 // TunnelWorkerConfig里面还有更多的高级参数,这里不做展开,会有专门的文档介绍。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown(); } }
图-4 全量阶段的分区消费数据量
图-5 增量阶段的消费延迟和分区消费数据量
图-6 数据通道负载均衡中
图-7 数据通道负载均衡完毕
[1]
[2] [3] [4] [5] [6] [7] [8] [9]转载地址:http://uapxl.baihongyu.com/