Flink DataStream API 官方文档的欺诈检测案例

一、背景介绍

flink官方文档中有一些具体的案例,其中有一个通过DataStream API检测欺诈的案例。在跟着它的教程一步步配置IDE的时候会有一些意想不到的问题出现。笔者将自己的踩坑过程做简单记录

二、学习过程

编译

教程中已经提醒必须的环境要求,不再赘述。笔者使用Scala语言做开发,所以跟着文档做的第一步maven编译如下; 编译完成后会有一个frauddetection的文件夹生成,之后就可以把得这个生成的项目导入到IDE中。

    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
    -DarchetypeVersion=1.16.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

注意⚠️:上边的命令是在命令终端操作的,如果是windows系统是可以通过win+R 快捷键弹出窗口出入 cmd回车,调出dos窗口; 如果是Mac电脑,使用终端操作

配置idea

  1. maven配置 Preferences –> Build, Execution, Deployment –> Build Tools –> Maven

  2. Scala: Preferences –> Build, Execution, Deployment –> Compiler –> Scala Compiler –> Scala Compile Server –> JDK

    这里笔者遇到的问题是在试运行代码的时候出现了如下错误

    packages cannot be represented as URI
  3. JAVA : File –> Project Structure –> Platform Settings –> SDKs 中的jdk等配置,尤其是电脑有多个jdk环境的情况下,要仔细看清楚。

三、代码理解

open方法

Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.

classDiagram
direction BT
class AbstractRichFunction
class Function {
<<Interface>>

}
class KeyedProcessFunction~K, I, O~
class Public
class PublicEvolving
class RichFunction {
<<Interface>>

}
class Serializable {
<<Interface>>

}

Public  ..  AbstractRichFunction 
AbstractRichFunction  ..>  RichFunction 
AbstractRichFunction  ..>  Serializable 
Public  ..  Function 
Function  -->  Serializable 
KeyedProcessFunction~K, I, O~  -->  AbstractRichFunction 
PublicEvolving  ..  KeyedProcessFunction~K, I, O~ 
Public  ..  PublicEvolving 
RichFunction  -->  Function
Public  ..  RichFunction

简单理解代码的具体内容, open方法在逐条处理数据之前被执行,而且多数情况基本上是针对全局变量的

  // Description copied from class: RichFunction
  // 在迭代方法之前用于初始化配置或者公共变量
  // 也就是这个open方法会在processElement方法之前调用

  override def open(parameters: Configuration): Unit =
    {
      val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
      flagState = getRuntimeContext.getState(flagDescriptor)

      val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
      timerState = getRuntimeContext.getState(timerDescriptor)
    }

在 org.apache.flink.api.common.functions.AbstractRichFunction 中的方法定义时 有一句 ==Default life cycle methods== 这个注释有助于深刻理解open和close这两个方法的定位;

  // --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}

由于AbstractRichFunction implements 了 RichFunction,我们再次找到org.apache.flink.api.common.functions.RichFunction 这个接口中的open方法定义, 其中传入的

Configuration 是org.apache.flink.configuration.Configuration, 更进一步说明这个方法主要是处理flink相关参数的, 一般框架的参数设置多数处于系统或者程序执行的全局观来定位。

    /**
     * Initialization method for the function. It is called before the actual working methods (like
     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
     * part of an iteration, this method will be invoked at the beginning of each iteration
     * superstep.
     *
     * <p>The configuration object passed to the function can be used for configuration and
     * initialization. The configuration contains all parameters that were configured on the
     * function in the program composition.
     *
     * <pre>{@code
     * public class MyFilter extends RichFilterFunction<String> {
     *
     *     private String searchString;
     *
     *     public void open(Configuration parameters) {
     *         this.searchString = parameters.getString("foo");
     *     }
     *
     *     public boolean filter(String value) {
     *         return value.equals(searchString);
     *     }
     * }
     * }</pre>
     *
     * <p>By default, this method does nothing.
     *
     * @param parameters The configuration containing the parameters attached to the contract.
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     * @see org.apache.flink.configuration.Configuration
     */
    void open(Configuration parameters) throws Exception;

onTimer 方法

在 KeyedProcessFunction 的方法定义时如下,

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
     *     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
     *     registering timers and querying the time. The context is only valid during the invocation
     *     of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

针对监控程序代码具体的理解, 这其中有一个 "#" 井号的参数,这个是Scala的路径依赖类型(Path-dependent types)

  // 不是无期限监控一个key 当计时器到时间了 还没有检测到异常,我们就默认这个key的交易没啥问题, 清理掉之前的状态
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
                       out: Collector[Alert]): Unit = {
    timerState.clear()
    flagState.clear()
  }

processElement 方法

  @throws[Exception]
  def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    println(transaction.toString)
    // open 置前 通过运行时上下文拿到ValueStateDescriptor ,此时调用具体的值
    val lastTransactionWasSmall: lang.Boolean = flagState.value()
    // 如果值存在,则说明以前该key已经触发了最小值; 不存在则走后边的逻辑
    if(lastTransactionWasSmall !=null){
      if(transaction.getAmount > FraudDetector.LARGE_AMOUNT){
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // 检测到异常且正常警告后  清理掉之前的状态和计时器
      cleanUp(context)
    }

    // 只要交易额度小于最小值 则开始检测该key的后续值,同时新建 计时器
    // 计时器可以灵活运营于规则判断,有些交易时间间隔较长不一定是欺诈,所以我们假设一分钟内发生的大小值差异属于欺诈行为
    // 具体是不是真的一分钟,在现实数据中是需要多做校验的
    if(transaction.getAmount < FraudDetector.SMALL_AMOUNT){
      flagState.update(true)
      val timer: Long = context.timerService().currentProcessingTime() + FraudDetector.ONE_MINUTE
      context.timerService().registerProcessingTimeTimer(timer)

      timerState.update(timer)
    }

  }

  @throws[Exception]
  def cleanUp(context: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {

    val timer: lang.Long = timerState.value()
    context.timerService().deleteEventTimeTimer(timer)

    timerState.clear()
    flagState.clear()
  }

Scala 抽象类 ( abstract classe ) 特质 (trait)

一、世界观

1. Scala 是纯面向对象的语言,所以在 Scala 中没有接口。

抽象类( abstract classe )可以说是面向对象的,而接口( interface )的本质是对继承机制的一个补充,从面向对象来看,接口并不属于面向对象的范畴。

理解 trait 等价于 Java 中接口加上抽象类(interface + abstract class),也就是说,trait 既可以有抽象方法,有可以有非抽象方法,普通方法,有多个类具有相同的特征(特征)时,就可以将这个特质(特征)独立出来,采用关键字 trait 声明。

1. Java中接口的使用,是通过类来实现接口,使用的关键字是implements。

2. Scala中类不是实现特质,而是动态混入[mixin]特质,使用的关键字是extends或with。

3. override关键字。如果重写的是普通方法,则必须加上override关键字;如果重写的是抽象方法,可以省略override关键字。

Scala中的术语mixin是指若干trait,这些trait可以用于合成一个类。

    // 抽象类A,包含一个字符串类型的成员属性message
    abstract class A {
      val message: String
    }
    // 类B继承A,并定义了具体定义了属性message的值
    class B extends A {
      val message = "I'm an instance of class B"
    }
    // trait C继承A,添加了一个loudMessage方法
    trait C extends A {
      def loudMessage = message.toUpperCase()
    }
    // 类D继承B并实现C
    class D extends B with C
    // 实例化一个D
    val d = new D
    d.message  // I'm an instance of class B
    d.loudMessage  // I'M AN INSTANCE OF CLASS B

类D有一个父类B和一个mixin C。Scala不支持多继承,也就是说一个类只能有一个父类。但可以有多个mixin。继承一个类使用extends 关键词,扩展一个mixin使用with 关键词。

动态混入可以在不影响原有继承关系地基础上,给指定的类扩展功能。Scala 加入了动态混入[mixin],就像接口解耦


    object MixINDemo {
      def main(args: Array[String]): Unit = {
        //普通类动态混入
        val oraclDB = new OracleDB with Opreater
        oraclDB.insert(100)
        //抽象类的动态混入
        val mySQL = new MySQL with Opreater
        mySQL.insert(200)
        //如果一个抽象类有抽象方法,动态混入的方法
        val mysql = new MySQL_ with Opreater {
          override def say(): Unit = {
            println("我是overwrite 抽象类的方法,")
          }
        }
        mysql.insert(90)
        mysql.say()
      }

    }
    trait Opreater{//特质
      def insert(id:Int):Unit={
        println("插入数据" + id)
      }

    }
    class OracleDB{}
    abstract class MySQL{}
    abstract class MySQL_{
      def say()
    }

2. Scala和Java,二者的抽象类的本质是一样的

- 抽象类中可以有抽象方法和普通方法。

- 有抽象方法的类则一定要定义成抽象类

- 抽象方法前面不需加abstract关键字,没有方法体的方法的便是抽象方法了,这一点跟Java中的不太一样。
    // 抽象类
    abstract class Animal(val age: Int) {

        // 抽象方法

        def eat()

        // 普通的方法

        def sleep(sound: String) = {

            println(s"sleep with $sound")

        }

    }

二、Scala 什么时候应该使用特质而不是抽象类?

  • 优先使用特质。一个类扩展多个特质是很方便的,但却只能扩展一个抽象类。
  • 如果你需要构造函数参数,使用抽象类。因为抽象类可以定义带参数的构造函数,而特质不行。
  • Whenever you implement a reusable collection of behavior, you will have to decide whether you want to use a trait or an abstract class. There is no firm rule, but this section contains a few guidelines to consider.
  • If the behavior will not be reused, then make it a concrete class. It is not reusable behavior after all.
  • If it might be reused in multiple, unrelated classes, make it a trait. Only traits can be mixed into different parts of the class hierarchy.
  • If you want to inherit from it in Java code, use an abstract class. Since traits with code do not have a close Java analog, it tends to be awkward to inherit from a trait in a Java class. Inheriting from a Scala class, meanwhile, is exactly like inheriting from a Java class. As one exception, a Scala trait with only abstract members translates directly to a Java interface, so you should feel free to define such traits even if you expect Java code to inherit from it. See Chapter 29 for more information on working with Java and Scala together.
  • If you plan to distribute it in compiled form, and you expect outside groups to write classes inheriting from it, you might lean towards using an abstract class. The issue is that when a trait gains or loses a member, any classes that inherit from it must be recompiled, even if they have not changed. If outside clients will only call into the behavior, instead of inheriting from it, then using a trait is fine.
  • If efficiency is very important, lean towards using a class. Most Java runtimes make a virtual method invocation of a class member a faster operation than an interface method invocation. Traits get compiled to interfaces and therefore may pay a slight performance overhead. However, you should make this choice only if you know that the trait in question constitutes a performance bottleneck and have evidence that using a class instead actually solves the problem.
  • If you still do not know, after considering the above, then start by making it as a trait. You can always change it later, and in general using a trait keeps more options open.

trait 和 abstract classe 主要区别

  • Abstract classes can have constructor parameters as well as type parameters. Traits can have only type parameters. There was some discussion that in future even traits can have constructor parameters
  • Abstract classes are fully interoperable with Java. You can call them from Java code without any wrappers. Traits are fully interoperable only if they do not contain any implementation code
  • Very important addendum: A class can inherit from multiple traits but only one abstract class. I think this should be the first question a developer asks when considering which to use in almost all cases.

三、Java抽象类和接口的对比

参数 抽象类 接口
默认的方法实现 它可以有默认的方法实现 接口完全是抽象的。它根本不存在方法的实现
实现 子类使用extends关键字来继承抽象类。如果子类不是抽象类的话,它需要提供抽象类中所有声明的方法的实现。 子类使用关键字implements来实现接口。它需要提供接口中所有声明的方法的实现
构造器 抽象类可以有构造器 接口不能有构造器
与正常Java类的区别 除了你不能实例化抽象类之外,它和普通Java类没有任何区别 接口是完全不同的类型
访问修饰符 抽象方法可以有public、protected和default这些修饰符 接口方法默认修饰符是public。你不可以使用其它修饰符。
main方法 抽象方法可以有main方法并且我们可以运行它 接口没有main方法,因此我们不能运行它。(java8以后接口可以有default和static方法,所以可以运行main方法)
多继承 抽象方法可以继承一个类和实现多个接口 接口只可以继承一个或多个其它接口
速度 它比接口速度要快 接口是稍微有点慢的,因为它需要时间去寻找在类中实现的方法。
添加新方法 如果你往抽象类中添加新的方法,你可以给它提供默认的实现。因此你不需要改变你现在的代码。 如果你往接口中添加方法,那么你必须改变实现该接口的类。

Java 中什么时候使用抽象类和接口

  • 如果你拥有一些方法并且想让它们中的一些有默认实现,那么使用抽象类吧。

    例如:Spring的依赖注入就使得代码实现了集合框架中的接口原则和抽象实现。

  • 如果你想实现多重继承,那么你必须使用接口。由于Java不支持多继承,子类不能够继承多个类,但可以实现多个接口。因此你就可以使用接口来解决它。

  • 如果基本功能在不断改变,那么就需要使用抽象类。如果不断改变基本功能并且使用接口,那么就需要改变所有实现了该接口的类。

什么时候该用抽象类,什么时候该用接口呢?

  1. 要解决上面的问题,我们先从弄清楚抽象类和接口之间的关系。首先,我们都知道类对事物的抽象,定义了事物的属性和行为。而抽象类是不完全的类,具有抽象方法。接口则比类的抽象层次更高。所以,我们可以这样理解它们之间的关系:类是对事物的抽象,抽象类是对类的抽象,接口是对抽象类的抽象。

  2. 从这个角度来看 java 容器类,你会发现,它的设计正体现了这种关系。不是吗?从 Iterable 接口,到 AbstractList 抽象类,再到 ArrayList 类。

  3. 在设计类的时候,首先考虑用接口抽象出类的特性,当你发现某些方法可以复用的时候,可以使用抽象类来复用代码。简单说,接口用于抽象事物的特性,抽象类用于代码复用

  4. 模式和语法是死的,人是活的

ArrayList 的类继承关系。可以看到,ArrayList 的继承关系中既使用了抽象类,也使用了接口。

  • 最顶层的接口是 Iterable,表示这是可迭代的类型。所有容器类都是可迭代的,这是一个极高的抽象。
  • 第二层的接口是 Collection,这是单一元素容器的接口。集合,列表都属于此类。
  • 第三层的接口是 List,这是所有列表的接口。

Scala Path-dependent types 路径依赖 inner class 内部类

Scala Path-dependent types 路径依赖,它使用”井号” 运算符 #, 具体的代码形式是 代码R#X[R#Next]。类似于 Java 的Outer.Inner.一般常用在定义参数类型

In Scala it is possible to let classes have other classes as members. As opposed to Java-like languages where such inner classes are members of the enclosing class, in Scala such inner classes are bound to the outer object. Suppose we want the compiler to prevent us, at compile time, from mixing up which nodes belong to what graph. Path-dependent types provide a solution.

Scala Path-dependent types是其一种特殊的语法表现形式,它使用”井号” 运算符 ==#== , 具体的代码形式是 代码R#X[R#Next]。类似于 Java 的Outer.Inner(Outer和Inner都是类,一个外部类,一个内部类);
一般常用在定义参数类型,使用的是内部类,则用这个方式来定义类型。

Java 的语言中,内部类是外部类的成员,而 Scala 正好相反,内部类是绑定到外部对象的。

talk is cheap, show me the code

 /**
   * 路径依赖 Path-dependent types
   *
   */
 // 定义外部类,内部类
 class Outer(val width: Int, val height : Int){

   class Inner{
     private val space = width * height

     def show(): Unit ={
       println(" the space is equal : " + space)
     }
   }
 }

 object PathDependence {
   def main(args: Array[String]) {
     val outer = new Outer(6, 8)
     val inner = new outer.Inner
     inner.show()
     // 特定类型的内部类
     val inner_belong_outer : outer.Inner = new outer.Inner
     inner_belong_outer.show()

     val another_other = new Outer(3, 4)
     val use_inner : Outer#Inner = new another_other.Inner // # 表示Inner是Outer的内部类
     use_inner.show()

   }
 }

 // println --> 48, 48 , 12

所谓路径,指的就是参考外部类所建立的实例的名称。 就outer.Inner这个类型来说,路径为outer。更重要的是,不同路径代表不同的类型。

路径依赖的是路径的名称,与路径所引用的实例无关

val o1 = new Outer
val o2 = o1
val i1: o1.Inner = new o1.Inner
val i2: o2.Inner = new o1.Inner   // 编译错误。
// 尽管o2和o1引用同一个实例,但o1.Inner和o2.Inner是不同的,Scala编译器只根据路径名进行区分

事实上, o1.Inner和o2.Inner都是Outer#Inner的子类型


    val o1 = new Outer
    val o2 = new Outer
    val i1: Outer#Inner = new o1.Inner
    val i2: Outer#Inner = new o2.Inner

路径依赖类型可以被继承

    val o = new Outer
    class Some extends o.Inner
    val oi: Outer#Inner = new Some

Outer#Inner是不能被继承的; 此外,路径依赖类型所在的路径必须是不可变的值。

    var o1 = new Outer
    val i = new o1.Inner   // 编译错误。因为o1是var

这也很好理解,如果o1是var,那么他的类型将是不确定的,从而导致其路径依赖类型也不确定。

一般地,x0.x1.x2…xn。T是路径一类类型,只要满足:

a) x0 是不可变值(即x0是val所声明的变量)

b) x1, x2, ..., xn 都是不可变的属性

c) T 是xn内部类 
    class Food

    class Fish extends Food {
        override def toString = "魚"
    }

    abstract class Animal {
        type F <: Food
        def eat(f: F)
    }

    class Cat extends Animal {
        type F = Fish
        def eat(fish: Fish) {
            println("吃" + fish)
        }
    }

    val cat1 = new Cat
    val cat2 = new Cat
    cat1.eat(new cat1.F)  // 吃魚
    cat2.eat(new cat2.F)  // 吃魚

对于Cat类而言,F是其类型成员之一, 但这里 cat1.F和cat2.F代表相同的类型,也就是说你可以把一个cat2.F类型的实例传递给一个声明为cat1.F类型的变量. 实际上,上述代码的最后四行等价于以下的代码: 当然new Cat#F也是合法的

    type F1 = Fish
    type F2 = Fish
    cat1.eat(new F1)   // 吃魚
    cat1.eat(new F2)   // 吃魚