1.安装flink,启动flink
文档地址:Apache Flink 1.3-SNAPSHOT 中文文档: Apache Flink 中文文档
代码:GitHub - apache/flink: Apache Flink
2. 打开端口 端口号, 启动jar
### 切换到flink 目录bin下
[root@localhost ~]# cd /home/flink/flink-1.14.4/bin/
### 运行
[root@localhost bin]# ./start-cluster.sh###开启端口9000
nc -l 9000
#### 运行jar./bin/flink run /home/flink/flink-1.14.4/examples/streaming/SocketWindowWordCount.jar --port 9000
3.测试jar,输入字符
注:1. 部署启动遇到的jar缺失
注释:jar可以下载源码查看,方法如图所示,也可以根据错误信息搜索对应的包
附: mysql+mq+mybatis +spring 需要的包
2,.代码
package com.javaland.flink.mq;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;public class Mq2Flink {/*** 实时监控mq数据,插入到mysql数据库*/public static void mq2mysql() throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}}
package com.javaland.flink.mq;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.javaland.flink.mapper.MessageMapper;
import com.javaland.flink.po.MessagePO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import java.util.List;public class SinkToMySQL extends RichSinkFunction<String> {static MybatisSqlSessionFactoryBean sqlSessionFactory;static SqlSessionFactory sessionFactory;static SqlSession sqlSession;static {sqlSessionFactory = new MybatisSqlSessionFactoryBean();// 配置多数据源PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver("com.mysql.cj.jdbc.Driver");pooledDataSource.setUsername("root");pooledDataSource.setPassword("root");pooledDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai");sqlSessionFactory.setDataSource(pooledDataSource);try {sqlSessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));sqlSessionFactory.setTypeAliasesPackage("com.javaland.flink.po");sessionFactory = sqlSessionFactory.getObject();} catch (Exception e) {throw new RuntimeException(e);}}/*** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);sqlSession = sessionFactory.openSession();}/*** @throws Exception*/@Overridepublic void close() throws Exception {super.close();}/*** @param value* @param context*/@Overridepublic void invoke(String value, Context context) {MessageMapper messageMapper = sqlSession.getMapper(MessageMapper.class);MessagePO messagePO=new MessagePO();messagePO.setUsername(value);messageMapper.insert(messagePO);List<MessagePO> all = messageMapper.selectList(null);if(all!=null){for (int i = all.size() - 1; i >= 0; i--) {System.out.println("查询的message:"+all.get(i));}}}}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.javaland</groupId><artifactId>javaland</artifactId><version>0.0.1</version></parent><packaging>jar</packaging><groupId>org.javaland</groupId><artifactId>javaland-flink</artifactId><properties><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-force-shading</artifactId><version>14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.11.1</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.18</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><executions><execution><id>mq2flink</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Mq2Flink</classifier><archive><manifestEntries><program-class>com.javaland.flink.Mq2Flink</program-class></manifestEntries></archive></configuration></execution><execution><id>Flink2Mq</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Flink2Mq</classifier><archive><manifestEntries><program-class>com.javaland.flink.Flink2Mq</program-class></manifestEntries></archive></configuration></execution></executions></plugin></plugins></build></project>
3. 最后打包的好处就是可以部署多个job