水善利万物而不争,处众人之所恶,故几于道💦
目录
1. 从Java的集合中读取数据
2. 从本地文件中读取数据
3. 从HDFS中读取数据
4. 从Socket中读取数据
5. 从Kafka中读取数据
6. 自定义Source
官方文档 - Flink1.13
1. 从Java的集合中读取数据
fromCollection(waterSensors)
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
2. 从本地文件中读取数据
readTextFile(“input/words.txt”)
,支持相对路径和绝对路径
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
运行结果:
3. 从HDFS中读取数据
readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)
要先在pom文件中添加hadoop-client依赖:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
4. 从Socket中读取数据
socketTextStream(“hadoop101”,9999)
,这个输入源不支持多个并行度。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据, windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
5. 从Kafka中读取数据
addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))
第一个参数是topic,
第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)
第三个参数是Kafka的配置。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
6. 自定义Source
addSource(new XXXX())
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
自定义数据源需要定义一个类,然后实现SourceFunction
接口,然后实现其中的两个方法,run
和cancel
,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止
public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}
运行结果:
demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/