1915160313884a7315ce03f15440ec0c.png

数据源是Flink DataSet API希望从其中获取数据的地方。它可以以文件或Java集合的形式出现。DataSet API支持许多内置的数据源函数。它还支持编写自定义数据源函数,因此不支持的任何东西都可以轻松编程。

首先,让我们理解其内置的数据源函数。

基于文件的数据源

Flink支持从文件中读取数据。它逐行读取数据并将其作为字符串返回。以下是我们可以用来读取数据的内置函数:

  • readTextFile(String path)/TextInputFormat:
    • 从路径指定的文件中传输数据。默认情况下,它将读取TextInputFormat并逐行读取字符串。
  • readTextFileWithValue(String path)/TextValueInputFormat :
    • 从路径指定的文件中传输数据。它返回可变字符串。
  • readCsvFile(String path)/CsvInputFormat :
    • 从逗号分隔的文件中读取数据。它返回Java POJO或tuples或case class对象。
  • readFileofPremitives(path,class)/readFileofPremitives(path,delimiter)/PrimitiveInputFormat :
    • 这将把新行解析为基本数据类型,如字符串或整数。
  • readFileofPremitives(path,delimiter,class)/PrimitiveInputFormat
  • readHadoopFile(FileInputFormat, Key, Value, path):
    • 它使用给定的FileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。
  • readSequenceFile(Key, Value, path)/SequenceFileInputFormat:
    • 创建一个JobConf并使用给定的SequenceFileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。

【示例】读取文件数据源中的数据

Java代码:

package com.xueai8.ch04;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
 
 
/**
 * Created by www.xueai8.com
 * 文件数据源
 */
public class FileSourceDemo01 {
    public static void main(String[] args) throws Exception {
        // 设置批处理执行环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
        // 首先从环境中获取一些数据,比如:
        String textPath = "src/wc.txt";
        DataSet<String> text = env.readTextFile(textPath).map(String::toLowerCase);
        text.print();
    }
}

执行程序,输出结果如下所示:

good good study
day day up

Scala代码:

package com.xueai8.ch04
 
import org.apache.flink.api.scala._
 
/**
  * Created by www.xueai8.com
  * 文件数据源
  */
object FileSourceDemo01 {
  def main(args: Array[String]): Unit = {
    // 设置批处理执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
 
    // 得到输入数据
    val textPath = "src/wc.txt"
    val text = env.readTextFile(textPath)
 
    // 对数据进行转换
//    text.map( _.toLowerCase).print()
    text.flatMap { _.toLowerCase.split("W+") }
            .map { (_, 1) }
            .groupBy(0)
            .sum(1)
            .print()
  }
}

执行程序,输出结果如下所示:

(up,1)
(day,2)
(good,2)
(study,1)
Logo

加入社区!打开量化的大门,首批课程上线啦!

更多推荐