Flink中普通API的使用

本篇文章从Source、Transformation(转换因子)、sink这三个地方进行讲解

Source:

  1. 创建DataStream
  2. 本地文件
  3. Socket
  4. Kafka

Transformation(转换因子):

  1. map
  2. FlatMap
  3. Filter
  4. KeyBy
  5. Reduce
  6. Union和connect
  7. Side Outputs

sink:

  1. print 打印
  2. writerAsText 以文本格式输出
  3. writeAsCsv 以csv格式输出
  4. 输出到MySQL
  5. 输出到kafka
  6. 自定义输出

先准备一个模板方便后续使用


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**@基本功能:@program:${PROJECT_NAME}@author: ${USER}@create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}**/
public class ${NAME} {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();
}
}

Source:

预定义source

创建DataStream(四种)

  • 使用env.fromElements:类型要一致
  • 使用env.fromcollections:支持多种collection的具体类型
  • 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
  • 使用env.fromSequence()方法创建基于开始和结束的DataStream
package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _01YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 各种获取数据的SourceDataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");dataStreamSource.print();// 演示一个错误的//DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);//dataStreamSource2.print();DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(Tuple2.of("张三", 18),Tuple2.of("lisi", 18),Tuple2.of("wangwu", 18));elements.print();// 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的APIString[] arr = {"hello","world"};System.out.println(arr.length);System.out.println(Arrays.toString(arr));List<String> list = Arrays.asList(arr);System.out.println(list);env.fromElements(Arrays.asList(arr),Arrays.asList(arr),Arrays.asList(arr)).print();// 第二种加载数据的方式// Collection 的子接口只有 Set 和 ListArrayList<String> list1 = new ArrayList<>();list1.add("python");list1.add("scala");list1.add("java");DataStreamSource<String> ds1 = env.fromCollection(list1);DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));// 第三种DataStreamSource<Long> ds3 = env.fromSequence(1, 100);ds3.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

 本地文件

        File file = new File("datas/wc.txt");File file2 = new File("./");System.out.println(file.getAbsoluteFile());System.out.println(file2.getAbsoluteFile());DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");ds1.print();// 还可以获取hdfs路径上的数据DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");ds2.print();

Socket 

linux(socket命令)
下载:yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!
如果端口被占用就换一个端口
本地(socket命令)
nc -lp 8888
或者 nc -l -p 8888
如果不是在exe端打开的用 -l -p 8888
java代码 
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
Word Count案例 
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SourceDemo02_Socket {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source-加载数据DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}});//3.2每个单词记为<单词,1>DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3分组KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();}
}

 JDBC

 Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink", "root", "root");PreparedStatement preparedStatement = connection.prepareStatement("select monitor_id,speed_limit from t_monitor_info group by monitor_id, speed_limit");ResultSet resultSet = preparedStatement.executeQuery();ArrayList<Tuple2<String,Double>> arr = new ArrayList<>();while (resultSet.next()){String monitor_id = resultSet.getString("monitor_id");Double speed_limit = resultSet.getDouble("speed_limit");Tuple2 tuple2 = new Tuple2(monitor_id, speed_limit);arr.add(tuple2);}

Kafka

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version>
</dependency>
package com.bigdata.day02;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已// 返回true 的数据就保留下来,返回false 直接丢弃dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String word) throws Exception {// 查看单词中是否包含success 字样return word.contains("success");}}).print();env.execute();}
}

自定义source

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.UUID;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}
}
public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 将自定义的数据源放入到env中DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;System.out.println(dataStreamSource.getParallelism());dataStreamSource.print();env.execute();}}

 如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话(有多少核就生成多少个数据)。

 Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntimeContext 方法可以获得当前的Runtime对象(底层API) 

rich模板
/*** 自定义一个RichParallelSourceFunction的实现*/
public class CustomerRichSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}/*Rich 类型的Source可以比非Rich的多出有:- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦- getRuntime方法可以获得当前的Runtime对象(底层API)*/public static class MySource extends RichParallelSourceFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("open......");}@Overridepublic void close() throws Exception {super.close();System.out.println("close......");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());}@Overridepublic void cancel() {}}
}

Transformation(转换因子):

map算子(一变多)

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 11:40:37**/
@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

FlatMap算子(类似于炸裂函数)

数据

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-21 09:51:59**/
public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

 Filter(过滤)

package com.bigdata.day03;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 09:10:30**/public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return  ip.equals("83.149.9.216");}});//4. sink-数据输出filterStream.print();//5. execute-执行env.execute();}
}

 KeyBy(分组)

元组

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

POJO (Plain Old Java Object):普通Java对象

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

 Reduce --sum的底层是reduce(聚合)

 //  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0,  t1.f1 + t2.f1);}}).print();

union和connect-合并和连接 

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 11:40:12**/
public class UnionConnectDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");DataStream<String> unionStream = stream1.union(stream2);unionStream.print();DataStream<Long> stream3 = env.fromSequence(1, 10);// stream1.union(stream3); 报错//3. transformation-数据处理转换ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 此时你想使用这个流,需要各自重新处理// 处理完之后的数据类型必须相同DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {// string 类型的数据@Overridepublic String map1(String value) throws Exception {return value;}// 这个处理long 类型的数据@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}
package com.bigdata.transforma;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2024-11-22 10:50:13**/
public class _08_两个流join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> ds1 = env.fromElements("bigdata", "spark", "flink");DataStreamSource<String> ds2 = env.fromElements("python", "scala", "java");DataStream<String> ds3 = ds1.union(ds2);ds3.print();// 接着演示 connectDataStreamSource<Long> ds4 = env.fromSequence(1, 10);ConnectedStreams<String, Long> ds5 = ds1.connect(ds4);ds5.process(new CoProcessFunction<String, Long, String>() {@Overridepublic void processElement1(String value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {System.out.println("String流:"+value);out.collect(value);}@Overridepublic void processElement2(Long value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {System.out.println("Long流:"+value);out.collect(String.valueOf(value));}}).print("合并后的打印:");//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

Side Outputs侧道输出(侧输出流) --可以分流

举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package com.bigdata.day02;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2024-05-13 16:19:56**/
public class Demo11 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));//2. source-加载数据SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value 代表每一个数据if (value % 2 == 0) {ctx.output(tag_even, value);} else {ctx.output(tag_odd, value);}}});// 从数据集中获取奇数的所有数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

sink:

print 打印

writerAsText 以文本格式输出

dataStreamSource.writeAsText("F:\\BD230801\\FlinkDemo\\datas\\result", FileSystem.WriteMode.OVERWRITE);

writerAsText 以文本格式输出

 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// writeAsCsv 只能保存 tuple类型的DataStream流,因为如果不是多列的话,没必要使用什么分隔符streamSource.writeAsCsv("datas/csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

输出到MySQL

 JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://localhost:3306/zuoye").withUsername("root").withPassword("123456").build();studentDataStreamSource.addSink(JdbcSink.sink("insert into stu values(?,?,?)",new JdbcStatementBuilder<Student>() {@Overridepublic void accept(PreparedStatement preparedStatement, Student student) throws SQLException {preparedStatement.setInt(1,student.getId());preparedStatement.setString(2,student.getName());preparedStatement.setInt(3,student.getAge());}},jdbcConnectionOptions));

 输出到kafka

FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<String>("topic2",new SimpleStringSchema(),properties);filterStream.addSink(kafkaProducer);

自定义Sink--模拟jdbcSink的实现 

package com.bigdata.day03;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 16:08:04**/public class CustomJdbcSinkDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class Student{private int id;private String name;private int age;}static class MyJdbcSink  extends RichSinkFunction<Student> {Connection conn =null;PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {// 这个里面编写连接数据库的代码Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "123456");ps = conn.prepareStatement("INSERT INTO `student` (`id`, `name`, `age`) VALUES (null, ?, ?)");}@Overridepublic void close() throws Exception {// 关闭数据库的代码ps.close();conn.close();}@Overridepublic void invoke(Student student, Context context) throws Exception {// 将数据插入到数据库中ps.setString(1,student.getName());ps.setInt(2,student.getAge());ps.execute();}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentStream = env.fromElements(new Student(1, "马斯克", 51));studentStream.addSink(new MyJdbcSink());env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/478877.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大语言模型LLM的微调代码详解

代码的摘要说明 一、整体功能概述 这段 Python 代码主要实现了基于 Hugging Face Transformers 库对预训练语言模型&#xff08;具体为 TAIDE-LX-7B-Chat 模型&#xff09;进行微调&#xff08;Fine-tuning&#xff09;的功能&#xff0c;使其能更好地应用于生成唐诗相关内容的…

解决SSL VPN客户端一直提示无法连接服务器的问题

近期服务器更新VPN后&#xff0c;我的win10电脑一致无法连接到VPN服务器&#xff0c; SSL VPN客户端总是提示无法连接到服务端。网上百度尝试了各种方法后&#xff0c;终于通过以下设置方式解决了问题&#xff1a; 1、首先&#xff0c;在控制面板中打开“网络和共享中心”窗口&…

《基于FPGA的便携式PWM方波信号发生器》论文分析(三)——数码管稳定显示与系统调试

一、论文概述 基于FPGA的便携式PWM方波信号发生器是一篇由任青颖、庹忠曜、黄洵桢、李智禺和张贤宇 等人发表的一篇期刊论文。该论文主要研究了一种新型的信号发生器&#xff0c;旨在解决传统PWM信号发生器在移动设备信号调控中存在的精准度低和便携性差的问题 。其基于现场可编…

基础入门-Web应用架构搭建域名源码站库分离MVC模型解析受限对应路径

知识点&#xff1a; 1、基础入门-Web应用-域名上的技术要点 2、基础入门-Web应用-源码上的技术要点 3、基础入门-Web应用-数据上的技术要点 4、基础入门-Web应用-解析上的技术要点 5、基础入门-Web应用-平台上的技术要点 一、演示案例-域名差异-主站&分站&端口站&…

Qt中2D绘制系统

目录 一、Qt绘制系统 1.1Qt绘制基本概念 1.2 绘制代码举例 1.3画家 1.3.1 QPainter的工作原理&#xff1a; 1.3.2 自定义绘制饼状图&#xff1a; 1.4画笔和画刷 1.4.1画笔 1.4.2 画刷填充样式 1.5 反走样和渐变 1.6绘制设备 1.7坐标变换 1.8QPainterPath 1.9绘制文…

Servlet细节

目录 1 Servlet 是否符合线程安全&#xff1f; 2 Servlet对象的创建时间&#xff1f; 3 Servlet 绑定url 的写法 3.1 一个Servlet 可以绑定多个url 3.2 在web.xml 配置文件中 url-pattern写法 1 Servlet 是否符合线程安全&#xff1f; 答案&#xff1a;不安全 判断一个线程…

VisionPro 机器视觉案例 之 凹点检测

第十六篇 机器视觉案例 之 凹点检测 文章目录 第十六篇 机器视觉案例 之 凹点检测1.案例要求2.实现思路2.1 方式一&#xff1a;斑点工具加画线工具加点线距离工具2.2 方法二 使用斑点工具的结果集边缘坐标的横坐标最大值ImageBoundMaxX2.3 方法三 使用斑点工具的结果集凹点结果…

SAP 零售方案 CAR 系统的介绍与研究

前言 当今时代&#xff0c;零售业务是充满活力和活力的业务领域之一。每天&#xff0c;由于销售运营和客户行为&#xff0c;它都会生成大量数据。因此&#xff0c;公司迫切需要管理数据并从中检索见解。它将帮助公司朝着正确的方向发展他们的业务。 这就是为什么公司用来处理…

手搓人工智能—聚类分析(下)谱系聚类与K-mean聚类

“无论结果如何&#xff0c;至少我们存在过” ——《无人深空》 前言 除了上一篇手搓人工智能-聚类分析&#xff08;上&#xff09;中提到的两种简单聚类方式&#xff0c;还有一些更为常用、更复杂的聚类方式&#xff1a;谱系聚类&#xff0c;K-均值聚类。 谱系聚类 谱系聚类…

【c语言】文件操作详解 - 从打开到关闭

文章目录 1. 为什么使用文件&#xff1f;2. 什么是文件&#xff1f;3. 如何标识文件&#xff1f;4. 二进制文件和文本文件&#xff1f;5. 文件的打开和关闭5.1 流和标准流5.1.1 流5.1.2 标准流 5.2 文件指针5.3 文件的打开和关闭 6. 文件的读写顺序6.1 顺序读写函数6.2 对比一组…

视频质量评价SimpleVQA

目录 一、研究意义例子 二、介绍三、文章解读3.1 论文动机3.2论文思路3.3方法3.3.1网络框架3.3.2公式解读3.3.3核心创新3.3.4理解 &#xff01;&#xff01;&#xff01;作者对模型的改进本人算法框体视频抽帧美学特征提取网络&#xff1a;3.3.5实验细节&#xff1a; 四、代码复…

【真实场景面试问题-2】

1 介绍一下低功耗设计手段–clock gating 和 clock domain&#xff1b; 芯片的低功耗设计尤其是在关注能耗的场景&#xff1a;移动设备、物联网和嵌入式系统&#xff1b;时钟门控&#xff08;Clock Gating&#xff09;和时钟域&#xff08;Clock Domain&#xff09;是两种常用…

c++编程玩转物联网:使用芯片控制8个LED实现流水灯技术分享

在嵌入式系统中&#xff0c;有限的GPIO引脚往往限制了硬件扩展能力。74HC595N芯片是一种常用的移位寄存器&#xff0c;通过串行输入和并行输出扩展GPIO数量。本项目利用树莓派Pico开发板与74HC595N芯片&#xff0c;驱动8个LED实现流水灯效果。本文详细解析项目硬件连接、代码实…

《白帽子讲Web安全》13-14章

《白帽子讲Web安全》13-14章 《白帽子讲Web安全》13-14章13、应用层拒绝服务攻击13.1、DDOS简介13.2、应用层DDOS13.2.1、CC攻击13.2.2、限制请求频率13.2.3、道高一尺&#xff0c;魔高一丈 13.3、验证码的那些事儿13.4、防御应用层DDOS13.5、资源耗尽攻击13.5.1、Slowloris攻击…

Linux—进程概念学习-03

目录 Linux—进程学习—31.进程优先级1.1Linux中的进程优先级1.2修改进程优先级—top 2.进程的其他概念3.进程切换4.环境变量4.0环境变量的理解4.1环境变量的基本概念4.2添加环境变量—export4.3Linux中环境变量的由来4.4常见环境变量4.5和环境变量相关的命令4.6通过系统调用获…

pikachu文件上传漏洞通关详解

声明&#xff1a;文章只是起演示作用&#xff0c;所有涉及的网站和内容&#xff0c;仅供大家学习交流&#xff0c;如有任何违法行为&#xff0c;均和本人无关&#xff0c;切勿触碰法律底线 目录 概念&#xff1a;什么是文件上传漏洞一、客户端check二、MIME type三、getimagesi…

从零开始学GeoServer源码(二)添加支持arcgis切片功能

文章目录 参考文章环境背景1、配置打包好的程序1.1、下载GeoServer的war包1.2、下载GeoWebCache1.3、拷贝jar包1.4、修改配置文件1.4.1、拷贝geowebcache-arcgiscache-context.xml1.4.2、修改geowebcache-core-context.xml1.4.3、修改geowebcache-servlet.xml 1.5、配置切片信息…

学习日志016--python实现双向循环列表与链栈

python中一些复合数据结构通过类的封装来实现的。双向循环链表与链栈也在其中。 双向循环链表 双向循环链表是一种特殊类型的链表&#xff0c;它结合了双向链表和循环链表的特点。在双向循环链表中&#xff0c;每个节点不仅包含数据&#xff0c;还持有指向前一个和后一个节点的…

【自动化Selenium】Python 网页自动化测试脚本(上)

目录 1、Selenium介绍 2、Selenium环境安装 3、创建浏览器、设置、打开 4、打开网页、关闭网页、浏览器 5、浏览器最大化、最小化 6、浏览器的打开位置、尺寸 7、浏览器截图、网页刷新 8、元素定位 9、元素交互操作 10、元素定位 &#xff08;1&#xff09;ID定位 &…

微软Ignite 2024:建立一个Agentic世界!

在今年的Microsoft Ignite 2024上&#xff0c;AI Agent无疑成为本次大会的重点&#xff0c;已经有十万家企业通过Copilot Studio创建智能体了。微软更是宣布&#xff1a;企业可以在智能体中&#xff0c;使用Azure目录中1800个LLM中的任何一个模型了&#xff01; 建立一个Agent…