Presto简介

Presto是什么?

Presto是一个facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。presto的架构由关系型数据库的架构演化而来。因此Presto集群的硬件必须满足大内存,万兆网络和高计算能力特点。

Presto特点

Presto的基本概念

  1. Presto 服务进程
    Presto 集群中一共有两种服务器进程: Coordinator服务进程,Worker服务进程。因此Presto集群是Master-Slave的拓扑结构。

    • Coordinator进程: 接受查询请求,解析查询语句,生成查询执行计划,任务调度,Worker管理。
    • Worker进程: 执行被分解后端而查询执行任务–>Task
  2. Presto模型
    • Connector Presto访问不同数据源的驱动程序。每种Connector都实现Presto中标准SPI接口。当年需要使用某种Connector访问特定的数据源时,需要在$PRESTO_HOME/etc/catalog中配置文件:example.properties,并在配置文件中设置一个属性:connector.name,Presto中Connector Manager就是通过该配置属性来决定使用哪一个Connector去访问数据。
    • Catalog 对应某一类数据源,例如hive的数据,或mysql的数据。当你访问Catalog中某个表时,该表的全名总是以Catalog的名字开始。例如 名字为example.schema1.table1的表,指的是表table1位于名schema1下的schema中,而schema1又位于example的Catalog中。
    • Schema 对应mysql中的数据库
    • Table 对应mysql中的表
  3. 硬件架构
    硬件架构
  4. 软件架构
    presto软件架构

    Presto查询步骤

    1)客户端通过Http协议发送一个查询语句给Presto集群的Coordinator
    2) Coordinator街道客户端传递过来的查询语句,会对该查询语句进行解析,生成查询执行计划,并根据查询执行计划一次生成SqlQueryExecution,SqlStageExecution,HttpRemoteTask。Coordinator会根据数据本地行生成对应的HttpRemoteTask。
    3) Coordiantor将每一个Task都分发到其所需要处理的数据所在的Worker上进行执行。这个过程是通过HttpRemoteTask中的HttpClient将创建或者更新Task请求发送给数据所在节点上TaskResource所提供的RestFul接口,TaskResource接收到请求之后最终会在对应的Worker上启动一个SqlTaskExecution对象或者更新对应的SqlTaskExecution对象需要处理的Split。
    4)执行处于上有的Source Stage中的Task,这些Task通过各种Connector从相应的数据源中读取所需要的数据。
    5) 处于下游的会读取上有Stage产生的输出结果,并在该Stage每隔Task所在Worker的内存中进行后续的计算和处理。
    6)Coordinator从分发的Task之后,就会一直持续不断的从Single Stage中的Task获取计算结果,并将计算结果缓存到Buffer中,直到所有的计算结束。
    7)Client从提交查询语句之后,就会不断地从Coordinator中获取本次查询的计算结果,直到获得了所有的计算记过。并不是等到所有的查询结果都产生完毕之后一次全部显示出来,而是每产生一部分,就会显示一部分,直到所有的查询结果都显示完毕。

Presto and Trino

Presto 以前是 PrestoDB,而 Trino 以前是 PrestoSQL。
PrestoDB 于 2012 年在 Facebook 创建,直到 2019 年 1 月它们都是同一个项目。
据我所知,该项目最初是为了解决 300 PB Hive 数据仓库上的缓慢查询而创建的。还有来自 Teradata 的 Presto 发行版等等。

Trino

  • 虽然 Trino 是针对多个数据源运行 ETL 查询的公司的出色解决方案,但它还提供了节省资源和收集更多输出的选项。
  • Trino 还擅长允许用户使用 SQL 运行即席查询,而不管数据位于何处。它消除了将数据 ETL 到另一个系统的需要。
  • Trino 允许用户创建个性化的报告和统一的仪表板,以更好地查询多个数据源。

Presto

Trino 中不存在的 Presto 功能,例如 Presto-on-Spark 看起来一直是开发重点:

  • Project Aria – 非常适合处理 ORC 等文件格式。
  • Project Presto Unlimited – 一种用于创建临时内存存储桶的内存节省功能。
  • 其他用户定义的函数,例如动态 SQL 函数支持。
  • Presto-on-Spark – Spark 执行器中的库。

差异

  • Presto 已在 Facebook、Uber 和 Twitter 上大规模开发、测试和运行
  • 今天的“Presto on Spark”可以运行大量批量 ETL 作业。
  • Presto 的 RaptorX 提供多级缓存
  • Presto 社区正在通过多个协调器而不是一个协调器节点使 Presto 更加可靠和可扩展(以防止单点故障)。

Trino 和 Presto 在架构和主要功能级别上基于事实的差异。两个项目从2019年开始分开,两者的发展和侧重点不同。如何回答以下三个问题:

  1. 在哪些场景中每个更有用?(或者 – 每个项目的重点是什么?)
  2. 两者中的任何一个都需要更多的内存资源吗?
  3. 对于某些用例,两者中的任何一个是否往往更快?

TL;DR:亲自尝试 Presto 和 Trino,并尝试与这两个社区进行互动。如果你想要一个黑白的答案,我的选择是 Trino。

这是我必须让您开始使用 Trino 的存储库。https://github.com/bitsondatadev/trino-getting-started

完全公开,我是 Starburst(企业 Trino)和 Trino/PrestoDB 贡献者的开发倡导者。任何有足够经验来回答这个问题的人都是有偏见的,所以我将提供一些背景和我对公正建议的最佳尝试。我还应该提到,你在那里发布的一个来源只是写关于 Presto 和 Trino 的文章,因为这是一个热门话题,他们只想获得 SEO 和页面浏览量。他们的技术与 Presto/Trino 几乎没有任何关系,他们不会检查他们写的任何东西。所以这增加了更多的混乱。我将首先回答您的问题,然后对您在上面指出的功能做出一些答复。

  • 在哪些场景中每个更有用?

它们通常用于取代 Hive 作为日期湖查询引擎,以及跨多个数据源联合查询。每个人都有自拆分以来添加的功能。您在上面为 Presto 和下面指出的两个博客详细介绍了自拆分以来 Trino 中添加的内容:

https://trino.io/blog/2020/01/01/2019-summary.html
https://trino.io/blog/2021/01/08/2020-review.html
Trino 现在对高精度时间戳支持进行了重大修复,这对于金融科技、游戏和其他需要高粒度时间支持的行业非常有用。Trino 还支持高级 SQL 功能,例如Windows 上的WINDOW语法和模式匹配以及通过MATCH_RECOGNIZE. Trino 中还有更多的动态过滤 + 动态分区修剪支持,它通过在优化器中提示哪些分区确实需要包含在查询连接中来删除冗余扫描。

我从 Presto 看到的唯一一件大事是我个人觉得有趣的 Presto on Spark 作品。目前值得注意,但我还没有看到有人写过他们如何使用或当您可以直接访问 Spark 时该功能实际上有多大帮助。我认为理论上你会在 Presto 生态系统中获得容错能力,但查询长时间运行的 Spark 作业有点违背 Presto 的目的,即提供更快的交互式查询。

  • 两者中的任何一个都需要更多的内存资源吗?

总的来说,这几乎是一样的。肯定会有所不同,尤其是取决于您运行的查询类型和您点击的数据源。这些项目共有6年的历史。架构中的任何一方都没有发生如此根本性的变化,以至于除了可能已明确优化的例外情况外,您将体验到内存使用方面的很大差异。

  • 对于某些用例,两者中的任何一个是否往往更快?

任何人都可以争辩说他们有更好的表现,并且双方都有自己的基准表明他们更好。这是一个愚蠢的游戏,不幸的是让您感到困惑而不是帮助。对于性能问题,我始终建议您将自己与实际用例(不仅仅是 TPCH/DS)进行比较。我很乐意在这里帮助你。我能说的最好的是这两个项目都专注于加速。

Presto 在 Raptor 和这个 Spark 插件上做了很多工作。Trino 在数据湖的 Hive 和 Iceberg 连接器上投入了大量精力。如果您需要支持亚秒级延迟,Trino 还具有用于实时连接 Pinot、Druid 和 Clickhouse 的连接器。Presto 有 Pinot 和 Druid,但还没有 ClickHouse。

  • 为什么我选择 Trino 并且现在是这个社区的开发者倡导者:

在之前的一家公司拆分为 PrestoDB 和 PrestoSQL/Trino 之前,我开始使用 Presto,我们决定迁移到 Trino 主要是因为 Presto 的创建者,大多数代码贡献者(来自 Qubole 和 Teradata 以及其他公司的一群人)迁移了以及。(参见:PrestoDB贡献者图|特里诺贡献者图,你会发现大量的杰出贡献者停止围绕2019年继续在特里诺)。正如您从 Trino 公告博客中的图表中看到的那样,PrestoSQL 中正在进行更多活动。

双方都添加了一些功能,但大多数添加到 PrestoDB 的功能只会使 Facebook 受益,而不是整个社区。有很多社区贡献的 PR 已经过时了,因为它们没有被优先考虑。

最后,除了仅仅因为性能而选择 Trino 之外,我敦促您加入 Trino 和 Presto slack 频道,看看那里的活动。当您需要帮助设置某些东西,或者想要找到学习资源,或者想要通过将您的用例贡献给 Trino 社区来参与其中时,您会看到与 Presto 相比,Trino 方面很热闹。当您需要启动项目时,这将使您启动并运行。

Flink DataStream API 官方文档的欺诈检测案例

一、背景介绍

flink官方文档中有一些具体的案例,其中有一个通过DataStream API检测欺诈的案例。在跟着它的教程一步步配置IDE的时候会有一些意想不到的问题出现。笔者将自己的踩坑过程做简单记录

二、学习过程

编译

教程中已经提醒必须的环境要求,不再赘述。笔者使用Scala语言做开发,所以跟着文档做的第一步maven编译如下; 编译完成后会有一个frauddetection的文件夹生成,之后就可以把得这个生成的项目导入到IDE中。

    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
    -DarchetypeVersion=1.16.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

注意⚠️:上边的命令是在命令终端操作的,如果是windows系统是可以通过win+R 快捷键弹出窗口出入 cmd回车,调出dos窗口; 如果是Mac电脑,使用终端操作

配置idea

  1. maven配置 Preferences –> Build, Execution, Deployment –> Build Tools –> Maven

  2. Scala: Preferences –> Build, Execution, Deployment –> Compiler –> Scala Compiler –> Scala Compile Server –> JDK

    这里笔者遇到的问题是在试运行代码的时候出现了如下错误

    packages cannot be represented as URI
  3. JAVA : File –> Project Structure –> Platform Settings –> SDKs 中的jdk等配置,尤其是电脑有多个jdk环境的情况下,要仔细看清楚。

三、代码理解

open方法

Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.

classDiagram
direction BT
class AbstractRichFunction
class Function {
<<Interface>>

}
class KeyedProcessFunction~K, I, O~
class Public
class PublicEvolving
class RichFunction {
<<Interface>>

}
class Serializable {
<<Interface>>

}

Public  ..  AbstractRichFunction 
AbstractRichFunction  ..>  RichFunction 
AbstractRichFunction  ..>  Serializable 
Public  ..  Function 
Function  -->  Serializable 
KeyedProcessFunction~K, I, O~  -->  AbstractRichFunction 
PublicEvolving  ..  KeyedProcessFunction~K, I, O~ 
Public  ..  PublicEvolving 
RichFunction  -->  Function
Public  ..  RichFunction

简单理解代码的具体内容, open方法在逐条处理数据之前被执行,而且多数情况基本上是针对全局变量的

  // Description copied from class: RichFunction
  // 在迭代方法之前用于初始化配置或者公共变量
  // 也就是这个open方法会在processElement方法之前调用

  override def open(parameters: Configuration): Unit =
    {
      val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
      flagState = getRuntimeContext.getState(flagDescriptor)

      val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
      timerState = getRuntimeContext.getState(timerDescriptor)
    }

在 org.apache.flink.api.common.functions.AbstractRichFunction 中的方法定义时 有一句 ==Default life cycle methods== 这个注释有助于深刻理解open和close这两个方法的定位;

  // --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}

由于AbstractRichFunction implements 了 RichFunction,我们再次找到org.apache.flink.api.common.functions.RichFunction 这个接口中的open方法定义, 其中传入的

Configuration 是org.apache.flink.configuration.Configuration, 更进一步说明这个方法主要是处理flink相关参数的, 一般框架的参数设置多数处于系统或者程序执行的全局观来定位。

    /**
     * Initialization method for the function. It is called before the actual working methods (like
     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
     * part of an iteration, this method will be invoked at the beginning of each iteration
     * superstep.
     *
     * <p>The configuration object passed to the function can be used for configuration and
     * initialization. The configuration contains all parameters that were configured on the
     * function in the program composition.
     *
     * <pre>{@code
     * public class MyFilter extends RichFilterFunction<String> {
     *
     *     private String searchString;
     *
     *     public void open(Configuration parameters) {
     *         this.searchString = parameters.getString("foo");
     *     }
     *
     *     public boolean filter(String value) {
     *         return value.equals(searchString);
     *     }
     * }
     * }</pre>
     *
     * <p>By default, this method does nothing.
     *
     * @param parameters The configuration containing the parameters attached to the contract.
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     * @see org.apache.flink.configuration.Configuration
     */
    void open(Configuration parameters) throws Exception;

onTimer 方法

在 KeyedProcessFunction 的方法定义时如下,

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
     *     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
     *     registering timers and querying the time. The context is only valid during the invocation
     *     of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

针对监控程序代码具体的理解, 这其中有一个 "#" 井号的参数,这个是Scala的路径依赖类型(Path-dependent types)

  // 不是无期限监控一个key 当计时器到时间了 还没有检测到异常,我们就默认这个key的交易没啥问题, 清理掉之前的状态
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
                       out: Collector[Alert]): Unit = {
    timerState.clear()
    flagState.clear()
  }

processElement 方法

  @throws[Exception]
  def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    println(transaction.toString)
    // open 置前 通过运行时上下文拿到ValueStateDescriptor ,此时调用具体的值
    val lastTransactionWasSmall: lang.Boolean = flagState.value()
    // 如果值存在,则说明以前该key已经触发了最小值; 不存在则走后边的逻辑
    if(lastTransactionWasSmall !=null){
      if(transaction.getAmount > FraudDetector.LARGE_AMOUNT){
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // 检测到异常且正常警告后  清理掉之前的状态和计时器
      cleanUp(context)
    }

    // 只要交易额度小于最小值 则开始检测该key的后续值,同时新建 计时器
    // 计时器可以灵活运营于规则判断,有些交易时间间隔较长不一定是欺诈,所以我们假设一分钟内发生的大小值差异属于欺诈行为
    // 具体是不是真的一分钟,在现实数据中是需要多做校验的
    if(transaction.getAmount < FraudDetector.SMALL_AMOUNT){
      flagState.update(true)
      val timer: Long = context.timerService().currentProcessingTime() + FraudDetector.ONE_MINUTE
      context.timerService().registerProcessingTimeTimer(timer)

      timerState.update(timer)
    }

  }

  @throws[Exception]
  def cleanUp(context: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {

    val timer: lang.Long = timerState.value()
    context.timerService().deleteEventTimeTimer(timer)

    timerState.clear()
    flagState.clear()
  }