极客星球 | Flink在数据智能公司的探索实践与优化
极客星球
2021-07-23

Flink探索

 

1.1:Why Flink

 

Apache Flink 是一个分布式处理引擎,用于离线和实时的计算。Flink凭借其极致的流式处理性能和优秀的框架设计吸引了众多开发者加入,各大厂也都纷纷引入Flink作为其主要的流式开发引擎。

 

Flink的主要优势:

  • Exactly-once 语义

  • 多种高效的窗口计算

  • 轻量级的checkpoint机制

  • 支持 EventTime 及时间乱序事件

  • 高效的反压机制

  • 强大的状态管理机制,可支持超大的状态存储

  • 支持流的join和维表的join

  • 众多的connector支持

  • 复杂事件处理CEP

  • 支持SQL、自定义UDF、UDFA等

 

Flink在满足复杂功能的同时,QPS还可以达到百万级别,同时容错性和准确性也可以得到保证,这是MobTech袤博科技选择Flink的主要原因。MobTech袤博科技是全球领先的数据智能科技平台,累计覆盖设备155亿+,DAU 2.6亿+,持续迭代的兴趣标签达6000+。在数据智能产业,以数据应用为主导,融合顶尖的大数据、云计算、人工智能等多元先进技术,打造开发者服务、商业化、AI、Mob研究院四大版块,为全球数百个国家和地区的企业、开发者和政府机构提供商业智能解决方案、App运营赋能方案、企业级AI智能方案、数据咨询研究等服务。

 

1.2  Flink在MobTech袤博科技的集群情况

 

公司采用 On-yarn 的运行模式,长时间运行的流任务采用Per-Job模式,部分应用采用Session方式运行。On-yarn模式简单方便无需关心维护Flink集群情况,只需要配置Yarn和部署Flink Client即可开发和提交Flink任务。Flink版本从最初的1.7一路升级到1.13,升级至最新版本的原因有三点:

 

1、由于社区版本迭代较快,如果跨多个版本迭代修改的成本会非常大,所以在小版本迭代的时候选择升级是最划算的,因为需要改动的代码不会太多;

 

2、Flink社区非常活跃,每个版本都会有较大的性能和功能的提升,特别是Flink SQL,这也是我们考虑的重要原因之一;

 

3、随着版本的迭代开发方式也得到升级,早先主要是基于stream api进行开发,开发效率偏低而且性能不是最优的。公司内部逐渐搭建起Flink流式平台,使开发SQL化大大提高了开发效率和程序性能。

 

MobTech袤博科技拥有独立的流式计算集群,包含了Flink、Spark、Storm等各种的流计算引擎。而Flink应用程序占80%以上,大部分过去开发的Spark streaming程序在使用Flink重构后,资源利用率翻了一倍,同时解决了计算延迟、数据背压以及外部资源依赖等问题。

 

Flink在数据智能企业的应用和挑战

 

Flink在MobTech袤博科技的应用场景有很多,从实时报表、数据监控、实时画像、机器学习到实时数仓,Flink在各个环节和业务线都发挥了至关重要的作用,为公司带来巨大的价值。接下来我将选取几个比较经典的案例来分享我们在Flink实践中取得的经验成果。

 

2.1  经典的多维度DAU计算问题

 

  • MobTech袤博科技作为全球领先的数据智能科技平台,由于覆盖的设备数量巨大,所以每天都有大量的日志进入日志系统;

  • 日均实时处理数据量达150亿+,日均QPS 20w/s+,数据处理峰值可达 90w/s,DAU 达2.6亿+,MAU 达 12亿+,兴趣标签体系6000+;

  • 同时Flink Checkpoint 达10G左右,所以如何准确地计算和存储如此巨大的数据成为一大挑战。

 

图片

单个Topic 最高 20w/s +,总和已经超过百万QPS

图片

计算过程中数据可扩张至千亿以上

 

挑战1:大QPS下的 UID去重问题

 

由于考虑到日活的数据量较大,单天的日活已达2.6亿+,周活和月活的可以翻数倍,如果要精确计算UV值将会消耗巨大的内存和磁盘空间,同时会导致checkpoint的结果巨大可能出现延迟甚至失败。Flink有原生的COUNT DISTINCT来支持去重计算,采用的 Split Distinct Agg 方式做聚合,能做到精确去重但效率不高长时间运行下去会越来越慢,同时无法解决状态太大的问题。原因在于COUNT DISTINCT使用了MapState作为状态存储,如果单个Key的UID过大会导致内存溢出同时State过大导致Checkpoint时间过长甚至失败。

 

所以我们采用业内常用的HyperLogLog算法做到误差小于0.1%的估算方式,单个维度的Key对应的HLL只有一个对象且大小只和精度有关,重写类似COUNT DISTICT的聚合函数即可实现,HLL_COUNT_DISTICT(UID)。

 

经过优化后Flink Checkpoint的大小由原来的30G降到2.5G左右降低了10倍以上的存储压力,同时计算上没有出现背压的情况。

 

挑战2:数据热点问题

 

在开发的过程中我们发现某个报表结果只有两个Key导致所有数据只进入两个slot计算导致热点问题,这类问题借由Flink原生对COUNT DISTINCT的优化思路Split Distinct Agg方式可以很好地解决。SQL语句如下:

 

SELECT day, SUM_HLL(hll)

FROM (   

SELECT day, HLL_COUNT_DISTICT(user_id) as hll    

FROM T    

GROUP BY day, MOD(HASH_CODE(user_id), 1024)

)GROUP BY day

 

第一次聚合由group key和额外的bucket key进行shuffle,bucket key是使用 HASH_CODE(distinct_key) % BUCKET_NUM计算的。当第二次group by day的时候需要注意的是,由于我们使用HLL_COUNT_DISTICT来替代原生COUNT DISTINCT,返回类型是HLL,所以需要自定义SUM_HLL对HLL对象做累加处理。

 

经过优化后,背压问题不再出现,各个Task的内存使用量由原来的8G调整为2G,同时可以通过控制BUCKET_NUM的大小来提高数据处理的并行度提高处理速度。

 

在MobTech袤博科技已经有一套完善的计算UV和解决数据热点问题的API,这些也已经集成在Flink源码里面无需开发人员再开发或者引用。

 

2.2  Flink SQL 上的一些改进

 

2.2.1  支持 EMIT SQL语法

 

Flink的一个非常强大的特性就是对Window计算的支持,Window有滑动窗口,翻滚窗口,session窗口,这些窗口功能可以满足不同的复杂需求。Flink触发窗口的条件也比较简单,就是在Watermark > Window end-time 的时候触发窗口计算并输出。但若有时候我们需要提前触发窗口的计算并输出呢?则需要在Flink stream Api 中提供了 Trigger来提前触发计算,例如我们需要每5s输出一次1h翻滚窗口的计算结果。

 

.window(TumblingEventTimeWindows.of(Time.hours(1)))

.trigger(ContinuousProcessingTimeTrigger.of(Time.Seconds(5)))

 

但这个Api有一个小缺点就是即使窗口的结果没有变化也会输出一次,导致相同结果频繁输出,这个问题我们修改了部分源码进行修正。但如果我们需要在SQL语法中添加这类逻辑呢?Flink原生SQL是不支持这类语法。在MobTech袤博科技内部的流计算平台,由于SQL化的推广我们需要一个SQL语法来支持此类需求:

 

INSERT INTO OUT_TABLE

SELECT * FROM TUMBLE_WINDOW_VIEW

EMIT

WITH ‘5’ SECONDS BEFORE WATERMARK

 

2.2.2  自定义ES connector

 

支持 ES的xpack验证,支持定时和批量写出

 

CREATE TABLE ES_SINK_INDEX(

Id VARCHAR,

Msg VARCHAR

) WITH (

‘connector’=’xpack-es’,

‘es.address’=’ip:port’

‘es.cluster.name’=’xxx’,

‘es.index’=’indexname’,

‘es.commit.batch’=’10000’,

‘es.commit.interval’=’1m’,

‘xpack.passw’=’xxx’,

‘xpack.file’=’xx.p12’

)

 

在应用之前需要在env注册registerCachedFile,将xpack file注册。

 

除了以上说到的一些优化外,团队在MobTech袤博科技还做了一些额外的性能优化,例如SQL的并行度设置、Repartition SQL语法支持、Hbase和Redis的维表Join SQL语法支持等。

 

未来探索:实时数仓 & 数据湖

 

Lambda架构:传统的实时数仓采用Lambda架构,架构将数仓分为离线层和实时层,离线和实时各一套计算引擎和处理逻辑。很多的业务需求相应的会有离线任务和实时任务两套,代码上因为无法公用一套,所以需要写两套逻辑代码,离线用Spark做,实时选用Flink做。

 

图片

Flink Kappa实时架构:在Lambda架构后又涌现出Kappa架构 ,OLAP架构等。

 

这类架构抛弃了Lambda的离线处理部分,全部采用Flink做为计算引擎,存储由HDFS替换为Kafka,最终结果存储为OLAP数据库。但问题也很明显,Kafka无法存储大量的历史数据,也无法支持OLAP的即时查询,也无法支持数据的更新删除等操作。

 

图片

 

Flink Iceberg 数据湖: 解决了Kafka无法存储大量历史数据问题,同时支持了OLAP查询功能,Iceberg支持流式的读写功能。

总结

 

随着公司的快速发展,数据的增长也越来越快,传统的数据处理框架和数据存储系统已渐渐显露出弊端,寻找更高效稳定的计算框架引擎和大数据架构是各个数据公司的目标,MobTech袤博科技作为领先的数据智能科技平台也在不断探索和优化自己的大数据平台架构,力求为客户带来更优质的服务。