美团点评实习总结

前言

美团实习的这段时间里,主要负责分布式调度系统CantorCantor的中文含义是合唱指挥家,不得不说这个名字非常符合Cantor的业务场景,Cantor主要用来调度数据生产的任务,最主要的业务是公司每天通过join生成的各种报表,以及一些需要周期性或者定期执行的入库,出库、SparkHadoop等任务。这类任务的主要特点是:(1)任务与数据相关,必须提供一些任务失败的修补操作,Cantor提供了重导工具;(2)大部分任务在夜间执行,要求调度系统要稳定执行,没有人会想凌晨起来去维护系统,目前Cantor启动的运维优化项目,目的就是降低运维的成本,提高系统的运维能力;(3)任务并发高,按小时、天、月执行的任务每次动辄几万个任务,必须要有相应的措施保证任务成功执行,Cantor通过队列+线程的方式处理高并发;(4)任务具有依赖关系,数据生产的任务之间具有依赖关系,Cantor通过对任务抽象为DAG(有向无环图)来处理任务之间的依赖关系。


技术篇

Cantor主要分为四个组件:

  • feCantor web前端,用于查看创建、查询、重导任务、监控、运维等,提供Restful API
  • core:核心调度器,提供队列任务和cron任务调度器,分配任务到相应的执行机;
  • executor:执行机,管理执行机的资源槽,运行并监控相应任务,对任务执行日志缓存;
  • reload:重导调度器,对执行失败的任务重新执行。

在实习期间,我主要负责fecore模块的内容,下面我主要分析这两个模块以及我所做的工作。


fe

用户可以通过Cantor前端界面提交任务,查询任务的状态等,Cantor前端使用Reactweb服务器使用gunicorn+Flask,默认开启2*cpu + 1个进程处理HTTP请求。

gunicorn作为CantorWSGI,服务网关接口,具有很强的伸缩性,能够运行在多线程多进程环境下,接口请求并路由到相应的web应用处理请求,Flask处理HTTP请求,通过URL去执行相应的函数,并求静态资源进行缓存。
Cantor接收通过SSO单点登录进行用户认证,查询类请求操作先查缓存,缓存未命中再去查询数据库。操作类请通过RPC调用响应的接口。这里主要分析Cantor创建任务的流程:

  1. 用户通过web界面或者直接创建HTTP请求发送创建任务的请求,请求数据包括任务的基本信息:任务类型、任务名称、任务的负责人等,生产计划:任务执行命令、执行机组、超时时间、重试次数等,上游依赖关系;
  2. API接受到请求后,对用户信息进行认证,写访问日志,规范化请求数据;
  3. 更新数据库taskplantask_relation表,创建任务时不检查环,只创建依赖关系;
  4. 调用RPC提交任务。

主要工作:

  • 1、解决登录认证时web api crash问题
    问题描述:进行SSO认证时,Cantor会在cookie中设置cantor_redirect_url,通过cantor_redirect_url发送重定向响应包,如果用户丢失cookie,导致cantor_redirect_url为空,fe没有相应的处理逻辑,导致程序奔溃。
    解决方案:加入判断逻辑,如果为空默认跳转到首页。
  • 2、生命周期页面详情页
    问题描述:任务执行失败,用户无法感知当前任务执行阶段,实时展示任务生命周期详情页,方便定位问题,降低运维成本。
    解决方案:任务执行到每个阶段,更新DAG并写入到squirrel(美团Redis基础服务)中,前端页面访问APIsquirrel拉取消息。

core

core模块实现了Cantor的两个核心调度器,队列任务调度器Schedulercron任务调度器CronService,通过apscheduler库实现任务的周期执行,类似Crontable。这里主要介绍队列任务:
队列任务分为按小时、天、周、月执行的任务,apscheduler创建相应的QueueEventQueueEvent创建Scheduler调度器生成任务DAG,调度器根据DAG中的依赖关系将任务提交到worker子进程中,worker创建一个线程提交任务到执行机,然后轮询获取任务的状态。队列任务运行流程如下:

  1. 初始化日志,最大并发数,资源队列;
  2. 根据依赖关系生成DAGDAG是队列任务的核心数据结构,所有的任务调度都要根据DAG中的顺序进行,上游任务执行失败,如果是强依赖关系,下游任务任务不能执行,弱依赖关系下下游任务可以执行;
  3. 筛选顶层节点,根据大V,SLA,优先级,就绪时间放入资源桶进行排序;
  4. 筛选槽位以及并发数满足要求的任务;
  5. 执行任务,成功后从DAG移除,转到第3步进行执行。

主要工作:

  • 1、解决重导时卡在“等待执行”状态
    问题描述:这个是重导时出现的问题,但重导和正常调度的逻辑基本上相同的,用户提交重导时,需要选择重导执行时间,之后提交到reviewer进行审核,如果reviewer审核时间超过24小时,在apscheduler中设置只执行时间在24小时内的任务,此时该重导任务一直处于等待执行状态。
    解决方案:将重导和正常调度的超时时间分开设置,添加到配置项中。默认设置重导超时时间为一周,正常执行超时时间为24小时。

    1
    2
    MISFIRE_FRACE_TIME = 24 * 3600 # 如果作业未调起执行,在多长时间内允许作业被执行
    RELOAD_MISFIRE_FRACE_TIME = 7 * 24 * 3600 # 重导任务在多长时间内允许被执行
  • 2、DAG强连通分量算法
    问题描述:生成DAG时,需要检测任务之间是否形成环,对于形成环的任务要及时预警,负责人通过查看形成环的任务,适当的更改任务消除环。
    解决方案:求解有向联通图中强连通分量的算法有KosarajuGabowTarjan算法,GabowTarjan算法的时间复杂度优于Kosaraju,只需要对图作一次DFS就可以求出图中强连通分量,Tarjan算法和Gabow算法的思想是相同的,Tarjan算法用数组保存最早追溯到的父节点,Gabow用栈来保存,Cantor采用了Tarjan算法。
    Tarjan算法简称为缩点算法,主要思想是维护一个深度优先搜索的访问次序数组dfs,以及表示通过有向边最早被访问到的节点的数组low,具有相同访问的“根”在low中缩成一个点,用栈来维护当前强连通节点。伪代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    id = 0;
    tarjan:
    dfs[u] = ++id;
    low[u] = ++id;
    stack.push(u);
    for (w : u)
    if w is visited:
    tarjan(w)
    low[u] = min(low[u], low[w])
    else if w in stack:
    low[u] = min(low[u], dfs[w])
    if low[u] == dfs[u]:
    while u == stack.top():
    res = stack.top()
    stack.pop()
  • 3、参与运维优化项目
    问题描述:Cantor中队列调度器是一个剥落调度模式,上游任务完成移除之后,加载下游任务执行,这样始终只有一层任务处于活跃状态,如果想操作之前执行过的任务,只能通过重导工具来进行,这样造成任务运维成本高,复杂度高。
    解决方案:改造jobdag,缓存所有任务的状态,将剥落式调度改为按分支调度,通知也支持多分支并发执行,大V分支加速等高级特性,同时更改运维为置为成功,置为失败,强制运行,挂起/解挂,运维操作更加清晰明了,运维成本也大大降低。这里简述一下置为成功的实现过程:

  1. 更新任务状态为FINISHED,写入数据库;
  2. 如何cur_inst_id不为空,表示任务正在执行机上运行,cur_inst_id置空;
  3. 根据cur_inst_id,杀死任务。

反思

Cantor作为一个基于依赖关系的分布式调度系统,再发展进步的同时也存在一些问题,下面是我自己的一些见解。

  1. 系统耦合度比较高,模块划分不清晰;
  2. 可扩展性差,core作为核心调度器,由于采用了进程间通信,无法分机部署,导致core容易出现瓶颈;
  3. 面向对象和面向过程思想融合在一起,维护性差,学习成本高;

生活篇

这次实习让我切身体会到,如何将自己平时学到的东西用在实践中,同时也明显感觉到自己对问题的理解能力一步一步提高。除了技术上的提升,实习也让我对公司的正规化运营流程,开发流程有了新的认识。在实习期间,我一直抱着这样的思想来完成我需要做的事情,为什么要做这个?做这个对系统会产生什么样的结果?实习让我明白了一个思想,结果导向,再做任何事之前要明确的认知到产生的结果。
“北漂”的生活也快要结束了,又要回到象牙塔中,学校生活了19年了,对社会,未来的工作充满了期待,不管结果如何,相信一切都是最好的安排,努力奋斗追逐明天。