环境准备
经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境。当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置。
由于是单节点安装,需要准备如下资源:
1、jdk1.8
2、zookeeper3.5.9
3、kafka_2.12-3.0.0
链接: 资源都整合在这里.
提取码:pbtw
JDK安装
-
找到jdk进行解压:
-
配置环境变量
vi /etc/profileexport JAVA_HOME=/usr/local/soft/jdk1.8.0_171export PATH=.:$JAVA_HOME/bin:$PATH
配置完后执行:source /etc/profile
zookeeper安装
- 找到zookeeper进行解压:
- 配置环境变量(如上)
- 解压后进入zookeeper的conf目录
- 执行mv zoo_sample.cfg zoo.cfg
- 修改以下路径
安装kafka
- 找到kafka进行解压:
- 配置环境变量(如上)
- 解压后进入kafka的config目录:vi server.properties,修改如下几处:
如上,所需要的资源已经配置完毕,下面进行启动
常用命令
启动(kafka目录下执行):
zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties停止(kafka目录下执行):
bin/kafka-server-stop.sh config/server.properties
zkServer.sh stop创建topic(kafka bin目录下执行):
kafka-topics.sh --create --topic test --bootstrap-server 10.0.4.2:9092 --partitions 1 --replication-factor 1查看topic(kafka bin目录下执行):
kafka-topics.sh --describe --topic test --bootstrap-server 10.0.4.2:9092创建生产者(kafka bin目录下执行):
kafka-console-producer.sh --topic test --bootstrap-server 10.0.4.2:9092创建消费者(kafka bin目录下执行):
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 10.0.4.2:9092
启动zookeeper和kafka后可以看到如下进程:
启动生产和消费者命令消费数据:
以上服务器上安装成功,接下来通过flink 读取kafka数据
防火墙端口号开通
这一步是非常重要的,因为云服务器默认是开启防火墙限制的,如果从本地访问特定ip端口号需要在防火墙规则进行配置。
1、进入云服务器首页管理页面
2、点击防火墙进行添加规则
3、分别添加9092和2181端口
4、添加完毕如下
添加依赖
我这里用的是flink1.12版本
```xml
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.12.5</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class kafkaTest {public static void main(String[] args) throws Exception{//创建上下文StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();test1(env);env.execute("kafkaTest");}public static void test1(StreamExecutionEnvironment env) {Properties prop = new Properties();prop.setProperty("bootstrap.servers","云服务器外网ip地址:9092");prop.setProperty("group.id","test");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("topicname",new SimpleStringSchema(),prop));stream.print();}}
服务器上开启一个生产者客户端并往里面打数据
运行结果如下: