流——>表
方式一
方式二
方式一:写sql
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));
表——>流
Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
wordCount案例
方式一:使用sql
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/*** @基本功能:* @program:flinkProject* @author: 堇年* @create:2024-11-28 14:42:27**/
public class _06_flink_wordcounnt {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象tEnv.createTemporaryView("t_1",flatMap,$("word"));//3. 编写sql语句Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");//4. 将Table变为stream流//使用toAppendStream时会报错 因为有聚合操作//DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 在这里可以映射为ROW对象,也可以映射为自己定义的实体类DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}
方式二:使用dsl语句
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;public class _06_flink_wordcounnt_dsl {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象Table table = tEnv.fromDataStream(flatMap,$("word"));//3. 编写sql语句Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));rsTable.printSchema();//4. 将Table变为stream流DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}
结果展示
+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的