flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。
开发及运行环境
- MacBook Pro (13-inch, M1, 2020) ;macOS Monterey
- Java 8 和 Scala 2.12
- 集成开发环境(IDE)使用 IntelliJ IDEA
- Flink1.13.0
工具安装
安装netcat
一般Mac不自带netcat,我们的代码示例中有用到改工具,需要我们自行安装netcat
brew install netcat
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/manifests/0.7.1-1
######################################################################## 100.0%
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee
==> Downloading from https://pkg-containers.githubusercontent.com/ghcr1/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee?se=2022-09-17T10%3A40%3A00Z&sig=xlGiGyvIHi8peoADPzotAT
######################################################################## 100.0%
==> Pouring netcat--0.7.1.monterey.bottle.1.tar.gz
? /usr/local/Cellar/netcat/0.7.1: 13 files, 135.7KB
IDE 和 Maven等工具
Mac m1 intellij idea gradle 通过brew快速安装配置及工具
代码示例
本地输入文件
hello world
hello flink
hello spark
hi hive
hi big data
flink早起版本批处理
基于DataSet[String] 的数据集处理
import org.apache.flink.api.scala._
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 创建环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 读取文件
val line_data_set = env.readTextFile("data/words.txt")
// 处理数据 打散
val words = line_data_set.flatMap(_.split(" ").map(x => (x, 1)))
// 分组
words.groupBy(0).sum(1).print
}
}
flink 有界数据批处理
基于DataStream[String]的流数据处理
object BoundedStreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流式处理环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val value = environment.readTextFile("data/words.txt")
// 处理数据 打散
val words = value.flatMap(_.split(" ")).map(x => (x, 1))
words.print()
// 分组
val value1 = words.keyBy(_._1)
value1.sum(1).print
environment.execute()
}
/**
* 并行编号 结果
* 4> (hello,1)
8> (hi,1)
1> (hi,1)
3> (hello,1)
6> (hello,1)
6> (spark,1)
3> (world,1)
1> (big,1)
4> (flink,1)
1> (data,1)
8> (hive,1)
7> (flink,1)
5> (world,1)
2> (big,1)
1> (hive,1)
3> (hi,1)
2> (data,1)
3> (hi,2)
3> (hello,1)
3> (hello,2)
1> (spark,1)
3> (hello,3)
*/
}
flink 无界数据流式处理
启动netcat
nc -lk 5678
hello world
hello scala
运行如下代码
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流式处理环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val value = environment.socketTextStream("localhost",5678)
// 处理数据 打散
val words = value.flatMap(_.split(" ")).map(x => (x, 1))
words.print()
// 分组
val value1 = words.keyBy(_._1)
value1.sum(1).print
environment.execute()
}
3> (hello,1)
3> (world,1)
5> (world,1)
3> (hello,1)
4> (hello,1)
4> (scala,1)
1> (scala,1)
3> (hello,2)