flink wordcount 一步一步学习 大数据

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

开发及运行环境

  1. MacBook Pro (13-inch, M1, 2020) ;macOS Monterey
  2. Java 8 和 Scala 2.12
  3. 集成开发环境(IDE)使用 IntelliJ IDEA
  4. Flink1.13.0

工具安装

安装netcat

一般Mac不自带netcat,我们的代码示例中有用到改工具,需要我们自行安装netcat

brew install netcat
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/manifests/0.7.1-1
######################################################################## 100.0%
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee
==> Downloading from https://pkg-containers.githubusercontent.com/ghcr1/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee?se=2022-09-17T10%3A40%3A00Z&sig=xlGiGyvIHi8peoADPzotAT
######################################################################## 100.0%
==> Pouring netcat--0.7.1.monterey.bottle.1.tar.gz
?  /usr/local/Cellar/netcat/0.7.1: 13 files, 135.7KB

IDE 和 Maven等工具

Mac m1 intellij idea gradle 通过brew快速安装配置及工具

代码示例

本地输入文件


hello world
hello flink
hello spark
hi hive
hi big data


flink早起版本批处理

基于DataSet[String] 的数据集处理

import org.apache.flink.api.scala._

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    // 创建环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 读取文件
    val line_data_set = env.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = line_data_set.flatMap(_.split(" ").map(x => (x, 1)))
    // 分组
    words.groupBy(0).sum(1).print
  }
}

flink 有界数据批处理

基于DataStream[String]的流数据处理

object BoundedStreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

  /**
   *  并行编号    结果
   *  4> (hello,1)
      8> (hi,1)
      1> (hi,1)
      3> (hello,1)
      6> (hello,1)
      6> (spark,1)
      3> (world,1)
      1> (big,1)
      4> (flink,1)
      1> (data,1)
      8> (hive,1)
      7> (flink,1)
      5> (world,1)
      2> (big,1)
      1> (hive,1)
      3> (hi,1)
      2> (data,1)
      3> (hi,2)
      3> (hello,1)
      3> (hello,2)
      1> (spark,1)
      3> (hello,3)
   */
}

flink 无界数据流式处理

启动netcat

nc -lk 5678
hello world
hello scala

运行如下代码

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.socketTextStream("localhost",5678)
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

    3> (hello,1)
    3> (world,1)
    5> (world,1)
    3> (hello,1)
    4> (hello,1)
    4> (scala,1)
    1> (scala,1)
    3> (hello,2)