Flink DataStream API 官方文档的欺诈检测案例

一、背景介绍

flink官方文档中有一些具体的案例,其中有一个通过DataStream API检测欺诈的案例。在跟着它的教程一步步配置IDE的时候会有一些意想不到的问题出现。笔者将自己的踩坑过程做简单记录

二、学习过程

编译

教程中已经提醒必须的环境要求,不再赘述。笔者使用Scala语言做开发,所以跟着文档做的第一步maven编译如下; 编译完成后会有一个frauddetection的文件夹生成,之后就可以把得这个生成的项目导入到IDE中。

    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
    -DarchetypeVersion=1.16.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

注意⚠️:上边的命令是在命令终端操作的,如果是windows系统是可以通过win+R 快捷键弹出窗口出入 cmd回车,调出dos窗口; 如果是Mac电脑,使用终端操作

配置idea

  1. maven配置 Preferences –> Build, Execution, Deployment –> Build Tools –> Maven

  2. Scala: Preferences –> Build, Execution, Deployment –> Compiler –> Scala Compiler –> Scala Compile Server –> JDK

    这里笔者遇到的问题是在试运行代码的时候出现了如下错误

    packages cannot be represented as URI
  3. JAVA : File –> Project Structure –> Platform Settings –> SDKs 中的jdk等配置,尤其是电脑有多个jdk环境的情况下,要仔细看清楚。

三、代码理解

open方法

Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.

classDiagram
direction BT
class AbstractRichFunction
class Function {
<<Interface>>

}
class KeyedProcessFunction~K, I, O~
class Public
class PublicEvolving
class RichFunction {
<<Interface>>

}
class Serializable {
<<Interface>>

}

Public  ..  AbstractRichFunction 
AbstractRichFunction  ..>  RichFunction 
AbstractRichFunction  ..>  Serializable 
Public  ..  Function 
Function  -->  Serializable 
KeyedProcessFunction~K, I, O~  -->  AbstractRichFunction 
PublicEvolving  ..  KeyedProcessFunction~K, I, O~ 
Public  ..  PublicEvolving 
RichFunction  -->  Function
Public  ..  RichFunction

简单理解代码的具体内容, open方法在逐条处理数据之前被执行,而且多数情况基本上是针对全局变量的

  // Description copied from class: RichFunction
  // 在迭代方法之前用于初始化配置或者公共变量
  // 也就是这个open方法会在processElement方法之前调用

  override def open(parameters: Configuration): Unit =
    {
      val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
      flagState = getRuntimeContext.getState(flagDescriptor)

      val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
      timerState = getRuntimeContext.getState(timerDescriptor)
    }

在 org.apache.flink.api.common.functions.AbstractRichFunction 中的方法定义时 有一句 ==Default life cycle methods== 这个注释有助于深刻理解open和close这两个方法的定位;

  // --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}

由于AbstractRichFunction implements 了 RichFunction,我们再次找到org.apache.flink.api.common.functions.RichFunction 这个接口中的open方法定义, 其中传入的

Configuration 是org.apache.flink.configuration.Configuration, 更进一步说明这个方法主要是处理flink相关参数的, 一般框架的参数设置多数处于系统或者程序执行的全局观来定位。

    /**
     * Initialization method for the function. It is called before the actual working methods (like
     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
     * part of an iteration, this method will be invoked at the beginning of each iteration
     * superstep.
     *
     * <p>The configuration object passed to the function can be used for configuration and
     * initialization. The configuration contains all parameters that were configured on the
     * function in the program composition.
     *
     * <pre>{@code
     * public class MyFilter extends RichFilterFunction<String> {
     *
     *     private String searchString;
     *
     *     public void open(Configuration parameters) {
     *         this.searchString = parameters.getString("foo");
     *     }
     *
     *     public boolean filter(String value) {
     *         return value.equals(searchString);
     *     }
     * }
     * }</pre>
     *
     * <p>By default, this method does nothing.
     *
     * @param parameters The configuration containing the parameters attached to the contract.
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     * @see org.apache.flink.configuration.Configuration
     */
    void open(Configuration parameters) throws Exception;

onTimer 方法

在 KeyedProcessFunction 的方法定义时如下,

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
     *     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
     *     registering timers and querying the time. The context is only valid during the invocation
     *     of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

针对监控程序代码具体的理解, 这其中有一个 "#" 井号的参数,这个是Scala的路径依赖类型(Path-dependent types)

  // 不是无期限监控一个key 当计时器到时间了 还没有检测到异常,我们就默认这个key的交易没啥问题, 清理掉之前的状态
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
                       out: Collector[Alert]): Unit = {
    timerState.clear()
    flagState.clear()
  }

processElement 方法

  @throws[Exception]
  def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    println(transaction.toString)
    // open 置前 通过运行时上下文拿到ValueStateDescriptor ,此时调用具体的值
    val lastTransactionWasSmall: lang.Boolean = flagState.value()
    // 如果值存在,则说明以前该key已经触发了最小值; 不存在则走后边的逻辑
    if(lastTransactionWasSmall !=null){
      if(transaction.getAmount > FraudDetector.LARGE_AMOUNT){
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // 检测到异常且正常警告后  清理掉之前的状态和计时器
      cleanUp(context)
    }

    // 只要交易额度小于最小值 则开始检测该key的后续值,同时新建 计时器
    // 计时器可以灵活运营于规则判断,有些交易时间间隔较长不一定是欺诈,所以我们假设一分钟内发生的大小值差异属于欺诈行为
    // 具体是不是真的一分钟,在现实数据中是需要多做校验的
    if(transaction.getAmount < FraudDetector.SMALL_AMOUNT){
      flagState.update(true)
      val timer: Long = context.timerService().currentProcessingTime() + FraudDetector.ONE_MINUTE
      context.timerService().registerProcessingTimeTimer(timer)

      timerState.update(timer)
    }

  }

  @throws[Exception]
  def cleanUp(context: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {

    val timer: lang.Long = timerState.value()
    context.timerService().deleteEventTimeTimer(timer)

    timerState.clear()
    flagState.clear()
  }

flink wordcount 一步一步学习 大数据

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

开发及运行环境

  1. MacBook Pro (13-inch, M1, 2020) ;macOS Monterey
  2. Java 8 和 Scala 2.12
  3. 集成开发环境(IDE)使用 IntelliJ IDEA
  4. Flink1.13.0

工具安装

安装netcat

一般Mac不自带netcat,我们的代码示例中有用到改工具,需要我们自行安装netcat

brew install netcat
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/manifests/0.7.1-1
######################################################################## 100.0%
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee
==> Downloading from https://pkg-containers.githubusercontent.com/ghcr1/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee?se=2022-09-17T10%3A40%3A00Z&sig=xlGiGyvIHi8peoADPzotAT
######################################################################## 100.0%
==> Pouring netcat--0.7.1.monterey.bottle.1.tar.gz
?  /usr/local/Cellar/netcat/0.7.1: 13 files, 135.7KB

IDE 和 Maven等工具

Mac m1 intellij idea gradle 通过brew快速安装配置及工具

代码示例

本地输入文件


hello world
hello flink
hello spark
hi hive
hi big data


flink早起版本批处理

基于DataSet[String] 的数据集处理

import org.apache.flink.api.scala._

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    // 创建环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 读取文件
    val line_data_set = env.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = line_data_set.flatMap(_.split(" ").map(x => (x, 1)))
    // 分组
    words.groupBy(0).sum(1).print
  }
}

flink 有界数据批处理

基于DataStream[String]的流数据处理

object BoundedStreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

  /**
   *  并行编号    结果
   *  4> (hello,1)
      8> (hi,1)
      1> (hi,1)
      3> (hello,1)
      6> (hello,1)
      6> (spark,1)
      3> (world,1)
      1> (big,1)
      4> (flink,1)
      1> (data,1)
      8> (hive,1)
      7> (flink,1)
      5> (world,1)
      2> (big,1)
      1> (hive,1)
      3> (hi,1)
      2> (data,1)
      3> (hi,2)
      3> (hello,1)
      3> (hello,2)
      1> (spark,1)
      3> (hello,3)
   */
}

flink 无界数据流式处理

启动netcat

nc -lk 5678
hello world
hello scala

运行如下代码

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.socketTextStream("localhost",5678)
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

    3> (hello,1)
    3> (world,1)
    5> (world,1)
    3> (hello,1)
    4> (hello,1)
    4> (scala,1)
    1> (scala,1)
    3> (hello,2)