通达信 day文件 解析_Flink内置数据源:文件数据源
数据源是Flink DataSet API希望从其中获取数据的地方。它可以以文件或Java集合的形式出现。DataSet API支持许多内置的数据源函数。它还支持编写自定义数据源函数,因此不支持的任何东西都可以轻松编程。首先,让我们理解其内置的数据源函数。基于文件的数据源Flink支持从文件中读取数据。它逐行读取数据并将其作为字符串返回。以下是我们可以用来读取数据的内置函数:readTextFil
·
数据源是Flink DataSet API希望从其中获取数据的地方。它可以以文件或Java集合的形式出现。DataSet API支持许多内置的数据源函数。它还支持编写自定义数据源函数,因此不支持的任何东西都可以轻松编程。
首先,让我们理解其内置的数据源函数。
基于文件的数据源
Flink支持从文件中读取数据。它逐行读取数据并将其作为字符串返回。以下是我们可以用来读取数据的内置函数:
- readTextFile(String path)/TextInputFormat:
- 从路径指定的文件中传输数据。默认情况下,它将读取TextInputFormat并逐行读取字符串。
- readTextFileWithValue(String path)/TextValueInputFormat :
- 从路径指定的文件中传输数据。它返回可变字符串。
- readCsvFile(String path)/CsvInputFormat :
- 从逗号分隔的文件中读取数据。它返回Java POJO或tuples或case class对象。
- readFileofPremitives(path,class)/readFileofPremitives(path,delimiter)/PrimitiveInputFormat :
- 这将把新行解析为基本数据类型,如字符串或整数。
- readFileofPremitives(path,delimiter,class)/PrimitiveInputFormat
- readHadoopFile(FileInputFormat, Key, Value, path):
- 它使用给定的FileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。
- readSequenceFile(Key, Value, path)/SequenceFileInputFormat:
- 创建一个JobConf并使用给定的SequenceFileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。
【示例】读取文件数据源中的数据
Java代码:
package com.xueai8.ch04;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* Created by www.xueai8.com
* 文件数据源
*/
public class FileSourceDemo01 {
public static void main(String[] args) throws Exception {
// 设置批处理执行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 首先从环境中获取一些数据,比如:
String textPath = "src/wc.txt";
DataSet<String> text = env.readTextFile(textPath).map(String::toLowerCase);
text.print();
}
}
执行程序,输出结果如下所示:
good good study
day day up
Scala代码:
package com.xueai8.ch04
import org.apache.flink.api.scala._
/**
* Created by www.xueai8.com
* 文件数据源
*/
object FileSourceDemo01 {
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 得到输入数据
val textPath = "src/wc.txt"
val text = env.readTextFile(textPath)
// 对数据进行转换
// text.map( _.toLowerCase).print()
text.flatMap { _.toLowerCase.split("W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}
执行程序,输出结果如下所示:
(up,1)
(day,2)
(good,2)
(study,1)
更多推荐


所有评论(0)