一、背景介绍
在flink官方文档中有一些具体的案例,其中有一个通过DataStream API检测欺诈的案例。在跟着它的教程一步步配置IDE的时候会有一些意想不到的问题出现。笔者将自己的踩坑过程做简单记录
二、学习过程
编译
教程中已经提醒必须的环境要求,不再赘述。笔者使用Scala语言做开发,所以跟着文档做的第一步maven编译如下; 编译完成后会有一个frauddetection的文件夹生成,之后就可以把得这个生成的项目导入到IDE中。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-DarchetypeVersion=1.16.0 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
注意⚠️:上边的命令是在命令终端操作的,如果是windows系统是可以通过win+R 快捷键弹出窗口出入 cmd回车,调出dos窗口; 如果是Mac电脑,使用终端操作
配置idea
-
maven配置 Preferences –> Build, Execution, Deployment –> Build Tools –> Maven
-
Scala: Preferences –> Build, Execution, Deployment –> Compiler –> Scala Compiler –> Scala Compile Server –> JDK
这里笔者遇到的问题是在试运行代码的时候出现了如下错误
packages cannot be represented as URI
-
JAVA : File –> Project Structure –> Platform Settings –> SDKs 中的jdk等配置,尤其是电脑有多个jdk环境的情况下,要仔细看清楚。
三、代码理解
open方法
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.
classDiagram
direction BT
class AbstractRichFunction
class Function {
<<Interface>>
}
class KeyedProcessFunction~K, I, O~
class Public
class PublicEvolving
class RichFunction {
<<Interface>>
}
class Serializable {
<<Interface>>
}
Public .. AbstractRichFunction
AbstractRichFunction ..> RichFunction
AbstractRichFunction ..> Serializable
Public .. Function
Function --> Serializable
KeyedProcessFunction~K, I, O~ --> AbstractRichFunction
PublicEvolving .. KeyedProcessFunction~K, I, O~
Public .. PublicEvolving
RichFunction --> Function
Public .. RichFunction
简单理解代码的具体内容, open方法在逐条处理数据之前被执行,而且多数情况基本上是针对全局变量的
// Description copied from class: RichFunction
// 在迭代方法之前用于初始化配置或者公共变量
// 也就是这个open方法会在processElement方法之前调用
override def open(parameters: Configuration): Unit =
{
val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
flagState = getRuntimeContext.getState(flagDescriptor)
val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
timerState = getRuntimeContext.getState(timerDescriptor)
}
在 org.apache.flink.api.common.functions.AbstractRichFunction 中的方法定义时 有一句 ==Default life cycle methods== 这个注释有助于深刻理解open和close这两个方法的定位;
// --------------------------------------------------------------------------------------------
// Default life cycle methods
// --------------------------------------------------------------------------------------------
@Override
public void open(Configuration parameters) throws Exception {}
@Override
public void close() throws Exception {}
由于AbstractRichFunction implements 了 RichFunction,我们再次找到org.apache.flink.api.common.functions.RichFunction 这个接口中的open方法定义, 其中传入的
Configuration 是org.apache.flink.configuration.Configuration, 更进一步说明这个方法主要是处理flink相关参数的, 一般框架的参数设置多数处于系统或者程序执行的全局观来定位。
/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
* part of an iteration, this method will be invoked at the beginning of each iteration
* superstep.
*
* <p>The configuration object passed to the function can be used for configuration and
* initialization. The configuration contains all parameters that were configured on the
* function in the program composition.
*
* <pre>{@code
* public class MyFilter extends RichFilterFunction<String> {
*
* private String searchString;
*
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* @param parameters The configuration containing the parameters attached to the contract.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
* @see org.apache.flink.configuration.Configuration
*/
void open(Configuration parameters) throws Exception;
onTimer 方法
在 KeyedProcessFunction 的方法定义时如下,
/**
* Called when a timer set using {@link TimerService} fires.
*
* @param timestamp The timestamp of the firing timer.
* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
* TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
* registering timers and querying the time. The context is only valid during the invocation
* of this method, do not store it.
* @param out The collector for returning result values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
针对监控程序代码具体的理解, 这其中有一个 "#" 井号的参数,这个是Scala的路径依赖类型(Path-dependent types)
// 不是无期限监控一个key 当计时器到时间了 还没有检测到异常,我们就默认这个key的交易没啥问题, 清理掉之前的状态
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
out: Collector[Alert]): Unit = {
timerState.clear()
flagState.clear()
}
processElement 方法
@throws[Exception]
def processElement(
transaction: Transaction,
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
collector: Collector[Alert]): Unit = {
println(transaction.toString)
// open 置前 通过运行时上下文拿到ValueStateDescriptor ,此时调用具体的值
val lastTransactionWasSmall: lang.Boolean = flagState.value()
// 如果值存在,则说明以前该key已经触发了最小值; 不存在则走后边的逻辑
if(lastTransactionWasSmall !=null){
if(transaction.getAmount > FraudDetector.LARGE_AMOUNT){
val alert = new Alert
alert.setId(transaction.getAccountId)
collector.collect(alert)
}
// 检测到异常且正常警告后 清理掉之前的状态和计时器
cleanUp(context)
}
// 只要交易额度小于最小值 则开始检测该key的后续值,同时新建 计时器
// 计时器可以灵活运营于规则判断,有些交易时间间隔较长不一定是欺诈,所以我们假设一分钟内发生的大小值差异属于欺诈行为
// 具体是不是真的一分钟,在现实数据中是需要多做校验的
if(transaction.getAmount < FraudDetector.SMALL_AMOUNT){
flagState.update(true)
val timer: Long = context.timerService().currentProcessingTime() + FraudDetector.ONE_MINUTE
context.timerService().registerProcessingTimeTimer(timer)
timerState.update(timer)
}
}
@throws[Exception]
def cleanUp(context: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
val timer: lang.Long = timerState.value()
context.timerService().deleteEventTimeTimer(timer)
timerState.clear()
flagState.clear()
}