历史库平台技术揭秘 OceanBase

作者:OceanBase/ 公众号: 发布时间:2018-02-26

1. 绪论
OceanBase历史库建设来源于业务快速发展的迫切需求,在线库空间日渐紧张,如何将历史库数据迁移到历史库,并将历史数据尽快做删除成了当务之急。历史库项目主要有以下几个难题:
如何做父子表任务迁移?首先介绍一下父子表任务,如下图所示,我们有两张逻辑表table_xxx、table_xxx_info,后者是通过某个id和前者关联,我们的迁移条件(终态数据)只有第一张表存在(比如xxx_status = 'FINISHED'),第二张表需要关联第一张表才能判断是某行是否可以做删除。而目前常用的迁移工具都只支持单表的迁移。
如何做数据删除?目前常用的方案是利用迁移工具迁移后,使用数据校验工具做数据比对,数据比对完成之后,DBA再利用脚本或是其他工具进行数据删除。这个方式第一周期会比较长,第二无法保证在删除和校验的时间间隔内数据发生变化,从而出现数据质量问题。
如何尽可能高效又不影响业务?目前的迁移工具在做数据导入通常只考虑单个任务的限速,无法合理利用集群的整体资源,通常只能在夜间低峰运行。而历史库需要处理的数据量极大,对导入和删除性能要求都比较高。
为了解决上述的几个问题,我们决定自行实现一套历史库的工具,所以也就有了历史库平台这个项目。
2. 整体架构
2.1 平台概览

OceanBase历史库平台的架构简单,主要包含一下几个组件:1. client(历史库内核程序):负责任务的调度和执行;2. meta db(元数据库):用于存储任务元信息,通常采用OceanBase来保证高可用;3. web console和web UI(用户界面和后台API):用户操作的入口,用于任务的发布与进度查询;
历史库的client通常通常部署在历史库的机器上(client本身资源占用低,20G以内的内存,1-2 cpu core),不需要占用额外的机器,某种意义上相当于“混布”。并且由于平台的部署是天然的分布式架构,所以任何节点宕机都不会影响整体服务,可以提供高可用的服务:
本文的后续将主要介绍历史库内核程序。
2.2 术语介绍
 本章节将先介绍一些历史库的术语:
项目(project):通常是一个业务,他们通常包含类似的迁移规则,需要统一进行任务启停;
逻辑表(logic table):逻辑意义上的一张表,用户可以定义迁移规则的最小单位;
物理表(table):物理意义上的一张表,就是在数据库中真实可见的一张表,逻辑表做完分库分表后形成的就是物理表;比如逻辑表名称为table_xxx,那么物理表可能就是table_xxx_[00-99];
集群(cluster):表示一个物理意义上的数据库;
数据源(datasource):和业务层面上的数据源类似,表示一个数据库的连接方式,一个集群可以有多个数据源;
任务(job):调度的基本单位,表示一张物理表在给定的时间区间内的迁移(或校验/删除)任务,比如“table_xxx_00在20170101-20170301之间的数据迁移”表示一个任务;
子任务(task):执行的基本单位,子任务表示一个任务中的某段主键区间,一个任务通常会包含多个子任务;
行组(batch rows):sql执行的基本单位,表示一张物理表的一组数据,这个数据量通常不会太大(100-200行);
事务行组(trans rows):数据通道、断点续传的基本单位,对于非父子表任务,它就是单个行组;对于父子表的任务,它包括父子表在内的多个行组;
2.3 动态模型
上图是为内核的动态模型,内核主要是通过调度线程来驱动,调度线程会定期轮询元数据,从元数据库中获取可以执行的任务,然后将该任务分发到worker线程中。worker线程实际上是一组线程,他通常包括一个子任务划分线程,统计与配置更新线程,和若干个执行线程;worker线程也会去执行一些运维指令任务。
任务调度时,我们会考虑调度的源端和目标端集群的负载,并综合考虑任务优先级和任务类型,选择最为合适的任务来分发;这部分后续 @惟学(weixue.cx) 会单独写一篇文章做介绍;我们后续主要介绍任务的执行。
3. 常规任务
3.1 执行流程

上图展示了历史库内核一个任务(也就是一张物理表的迁移或校验删除)的执行流程,它实际上可以看做一个数据流不断分化、汇合的过程:
1. 数据先由子任务划分线程将任务按照主键划分成多个子任务;2. 每个子任务由一个数据读取线程负责读取,数据读取线程会按照一定的size读取源数据表,形成事务行组,并将其put到数据管道中;3. 数据消费线程(迁移或者校验、删除)从数据管道中获取事务行组,执行各自的数据消费操作(根据任务类型不同而不同);4. 数据消费线程在完成数据的消费之后,将该事务行组的状态标记为已经完成;
3.2 子任务划分
历史库平台有两种子任务划分的方式:根据规则进行任务划分(user defined):这个适用于一些特殊的场景,比如OceanBase 0.5按照主键prefix做打散的,将来OceanBase 1.0的分区表任务导入也会按照这种形式进行任务划分,这些场景使用自定义的方式划分任务能够取得的更好的导入效果;自动按照主键进行任务划分(auto):这个是通用场景下的任务划分方式,默认情况下我们会使用这种任务划分方式;它的运行流程如下:首先根据条件(如有)获取范围的下界:
select pk1, pk2 from table_xxx_00 order by pk1 asc, pk2 asc limit 1;
之后每次移动一个定长(注意这里每次移动的向量值,而不是offset的值,避免扫描越来越慢),来确定子任务边界
select pk1, pk2 from table_xxx_00 where (pk1, pk2) > ('pk1_value1', 'pk2_value1')order by pk1 asc, pk2 asc limit 1000000, 1;
当位点获取不到时(也就是上面的sql返回空),直接获取一个最大值,并标志这个任务已经扫描
select pk1, pk2 from table_xxx_00 where (pk1, pk2) > ('pk1_value2', 'pk2_value2')order by pk1 desc, pk2 desc limit 1;
对于每个任务,子任务划分线程只有一个,也就是它的执行速度决定了单个任务导入的上限;一行数据行长为100字节,扫描1000000行的时间为1秒,那么理论上的执行速度上限约为100M/s,对于单个物理表来说,这个速度通常已经够用;子任务划分完成之后会被put到子任务管道中;子任务管道是一个定长的先进先出队列,当子任务管道满了之后,子任务划分线程会被阻塞住,直到有子任务被数据读取线程取出。
3.3 数据读取
每个数据读取线程会从子任务管道中拉取一个子任务,根据子任务中划分的范围进行数据读取;数据读取通常会有两次sql交互,第一条sql用于确定一个主键范围,它和子任务划分线程的游标移动sql类似,只是它的移动范围会小的很多,例如:
select pk1, pk2 from table_xxx_00 where (pk1, pk2) > ('pk1_value1', 'pk2_value1')order by pk1 asc, pk2 asc limit 200, 1;
第二条sql会将过滤条件带上,用于读取实际的数据:
select pk1, pk2, col1, col2, col3, col4 from table_xxx_00 where (pk1, pk2) > ('pk1_value1', 'pk2_value1') and (pk1, pk2) <= ('pk1_value2', 'pk2_value2')and col1 = 'FINISHED'order by pk1 asc, pk2 asc;
使用这种方式读取数据可以保证在过滤条件走不上索引的情况下,也能正常完成数据的扫描,并且每次查询对数据库的压力是可控的,这个方便我们后续做限速控制;在实际的执行过程中,我们还做一些优化,避免执行过程中的无效扫描;例如,如果用户给定的过滤条件是主键前缀,那么我们会在第一条sql就把主键前缀的信息带上,在某些场景下直接通过query range的计算就可以判断出行是否存在,可以减少无效扫描的次数。
对于普通的表,上面的sql扫描出的一批记录单独就构成一个事务行组;对于包含父子表逻辑的表,我们会先按照上面的逻辑读取父表的记录,再跟进父表和子表的关联关系,从子表中用multiget的方式读取子表的对应行,从而形成一个事务行组。完成事务行组的读取之后,我们会将该事务行组put到一个定长的先进先出队列中,和子任务管道一样,数据管道满了之后,数据读取线程会阻塞住,直到有数据被消费掉;数据行组在完成读取之后,除了被put到数据通道外,还会被放到滑动窗口中,它的作用会在后续部分详细说明。
3.4 限速控制
事务行组在读取完成之后,需要先做限速控制;目前历史库内核是采用令牌桶的方式来做限速控制,也就是只有当事务行组取得令牌时,才会被put到数据管道中。对于令牌桶算法的实现细节有兴趣的同学,可以参考漏桶算法和令牌桶算法这篇文章。在历史库内核中,限速有两个维度,流量限制和行数限制,他们都区分读和写,各自分别计算:流量限制的限制是为了避免流量太大把网卡打爆(或速度太快把cpu、io用满),它是集群级别的一个限制,我们会根据当前集群运行的任务个数,动态的计算出每个任务的限速(准实时)。例如,一个集群的总限速是100M,某一个时刻运行的任务是10个,那么每个任务的执行速度就是10M/s,此时如果该集群又有新任务开始运行,总任务变为11个之后,每个任务的执行速度会自动降为9M/s。数据行数的限制是为了避免产生过多的rps(record per second,可能会影响drc)。它的实现方式和流量限制类似,唯一不同的是它是物理表级别的一个限制。此外,历史库内核在调度过程中,也会对集群并发做限制,保证一个集群内的任务并发是可控的。
3.5 数据消费
数据消费实际上包括迁移、校验、删除这几种类型,他们数据来源类似,都是从数据读取线程获取到数据,然后根据执行器的不同,执行不同的操作。
3.5.1 数据迁移
数据迁移的逻辑比较简单,把已经读取到的数据批量写入数据库即可;在写入的时候,通常使用insert on duplicate key update的形式来避免出现重复行(比如任务异常中断开始重试)时数据库的报错。数据迁移的时候我们做了两个特殊处理:  防导爆:OceanBase将数据划分为增量数据和基线数据,增量数据存在内存中,增量数据达到一定阈值了,就会合并到基线数据。在合并的过程中,如果继续有大量的写入,也就是写入超过合并的速度时,内存就会不够用,对数据库的稳定性会产生比较大的影响;所以数据迁移时,我们需要定时去查询server内部的虚拟表,观察内存是否达到阈值(合并阈值),如果已经达到,会停写一段时间(通常是60s),然后再继续查,直到内存低于阈值。
 分区计算:对于OceanBase 1.0历史库,我们通常采用按照日期做range分区的分区表,对于同一个物理表,它的分区可能分布在不同的server上,对于同一个批次的数据,如果涉及多个分区,那么在写入的时候就有可能产生分布式事务,影响执行性能。所以,历史库内核在同一个批次的数据写入之前,会先进行一次分区计算,将数据按照计算出来的分区id进行分组,然后进行分组写入,保证每条sql只涉及单分区,从而提高导入性能。
3.5.2 数据校验
数据校验时,历史库内核会先从在线库将相应的数据读取出来(实际上就是数据读取线程读取出来的那部分数据),然后再用这部分数据去历史库按照历史库的主键(即使历史库主键和在线库主键不一致)用multiple get的方式读取。读取出来的这部分数据,会和在线库的数据逐行逐列做比较。
数据比较时,可能出现以下几种异常情况:1. 字段不匹配(CHECK_COLUMN_NOT_MATCH):通常可能有两种原因,第一,是由于在线库和历史库数据精度不一致,比如时间类型在线库定义成datetime,历史库定义成datetime(6);第二,在线库的数据再迁移之后又被重新修改过。2. 数量量不一致(CHECK_RECORD_COUNT_NOT_MATCH):这个通常存在于带有状态的迁移中,数据在迁移的时候完成之后,数据状态又被修改了,所以在校验时发现在线库和历史库数据量不一致。
当出现错误时,我们会根据用户配置的不同而执行不同的操作(报错退出或忽略);
3.5.3 数据删除
目前,我们的数据删除全部都是校验后删除,也就是历史库内核一定保证在线库和历史库数据一致后才会执行数据删除操作;历史库内核在实现数据删除实际上是在数据校验添加一个新的数据管道(待删除管道),数据校验成功的事务行组会被放到管道里面,数据删除的消费线程获取到对应的事务行组,会分批次的进行数据删除。 删除和迁移类似(二者实际上都是数据的写入),都会做防导爆的控制。
3.6 断点续传
如图3-1所示,每个子任务都会保留一个滑动窗口,每个事务行组在生成后都会被放到滑动窗口中,由于子任务的数据读取线程是单线程,所以这些事务行组在滑动窗口的顺序是和生成的顺序一致;每个事务行组在消费完成之后,会被标记成功,此时如果滑动窗口最开头的任务是完成的,那么会进行一次窗口“移动”,直到滑动窗口最前面的任务是未完成的状态。每次随着窗口移动,移出去的那个事务行组的最后一行就成为了一个断点。对于每一个运行中的子任务,它的断点信息都会被定期汇报到数据库中。当这个任务由于某种原因退出时,其他节点可以继续跑这个任务,在重跑任务时会从数据库中获取到每个子任务的断点信息,从断点的位置开始继续运行。对于子任务生成线程,也有类似的断点续传机制。
3.7 配置更新与统计汇报
历史库内核有全局配置项和局部配置项两种配置,全局配置项由一个单独的线程更新,局部的配置项由每个worker定时更新;刷新到新的配置之后,他们都能够实时生效,不需要重启任务。所以,通常修改完配置(比如集群级别的限速)通常在一个刷新周期内(通常是10s-30s)就会在所有集群上生效。历史库内核也会定期汇报自身的状态,比如秒级速度、总计容量等。
4. 运维指令
除了常规任务,历史库平台还包含着运维指令。它和常规任务的区别在于,常规任务操作的是在线库和历史库,运维指令操作的是历史库平台的元数据库。和常规任务一样,运维指令也是由历史库前端发布,历史库内核执行,它的执行优先级高于常规任务。
4.1 创建物理表指令
如上面所描述的,在历史库平台中,一个逻辑表可能包含多个物理表,不同的物理表可能存在于不同的集群、不同的数据库中。那么如何描述这些物理表呢?最简单的做法就是让用户自行配置,但是这个映射逻辑可能会很复杂,并且容易遗漏;还有一种做法是利用这个逻辑表访问的中间件(比如zdal或者tddl)来获取元数据库,但是这样做会产生更多的依赖,并且并不适用于外部业务(外部可能会有其他形式的中间件)。最终,我们采用的方式是通过调度一个任务,这个任务会搜索在线库,并根据用户配置的逻辑表的正则表达式来生成物理表。如下图所示,table_xxx它的物理表结构是table_xxx_${table_index},它的数据存在db1-db3中,那么我们的内核会从这三个db中寻找符合table_xxx_%d的表,记录该表的源数据库并生成对应的物理表,源数据库名和物理表名可以唯一确定一张物理表。
4.2 批量创建任务指令
在历史库内核中,用户可以自行定义任务之间的关系,比如table_xxx_00表在同一时间内的迁移任务必须运行在table_xxx_00表删除任务之前,table_xxx_00表是删除必须运行在table_yyy_00表的删除之前等。用户在逻辑表级别定义逻辑表之间的映射关系,用户也可以在创建任务的时候,定义不同类似任务(迁移、删除、校验)的优先关系,最终用户创建任务后,会形成如果类似下图的一个DAG,下游任务只有等它依赖的所有上游任务完成后才会被调度。 任务调度的具体实现细节,我们会在后续的文章中继续介绍,这里不再展开。
5. 成果与展望
5.1 成果
历史库平台已经在线上稳定运行了将近一年,现在已经具备一个初步的操作界面,可以在操作界面上完成任务创建、启停、限速调整、数据源导入、速度观察等基本操作,已经开放给DBA使用。目前,历史库平台已经接入了包括交易、支付、充值、账务在内的20多个核心业务,累计迁移、删除的数据量都是PB级别以上,单集群容量已经突破2PB,单集群峰值迁移速度超过1GB/s。考虑历史库成本约为在线库成本1/10(考虑历史库高压缩比和廉价的SATA盘),后续还会有更多的业务接入。

其他栏目