Java通过calcite实时读取kafka中的数据

引入maven依赖

        <dependency>

            <groupId>org.apache.calcite</groupId>

            <artifactId>calcite-kafka</artifactId>

            <version>1.28.0</version>

        </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

    public static void main(String[] args) throws SQLException {

        String model = "inline:" +

                "{\n" +

                "  \"version\": \"1.0\",\n" +

                "  \"defaultSchema\": \"KAFKA\",\n" +

                "  \"schemas\": [\n" +

                "    {\n" +

                "    \"name\": \"KAFKA\",\n" +

                "    \"tables\": [\n" +

                "      {\n" +

                "        \"name\": \"TEST_TABLE\",\n" +

                "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

                "        \"stream\": { \"stream\": true },\n" +

                "        \"operand\": {\n" +

                "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

                "          \"topic.name\": \"my-cloud-events\",\n" +

                "          \"consumer.params\": {\n" +

                "            \"group.id\": \"calcite-ut-consumer\",\n" +

                "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

                "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

                "          }\n" +

                "        }\n" +

                "      }\n" +

                "    ]\n" +

                "    }\n" +

                "  ]\n" +

                "}";

        Properties info = new Properties();

        info.put("model", model);

        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

        print(calciteConnection,sql7);

        connection.close();

        calciteConnection.close();

    }

    public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

        final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

        final ResultSet resultSet = statement.executeQuery();

        ResultSetMetaData metadata = resultSet.getMetaData();

        while (resultSet.next()) {

            for (int i = 1; i <= metadata.getColumnCount(); i++) {

                System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

            }

            System.out.println();

        }

    }

}

发送测试数据

运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474265.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【时间之外】IT人求职和创业应知【36】-肖申克的救赎

目录 新闻一&#xff1a;信息技术应用创新产业大会在深圳开幕 新闻二&#xff1a;人工智能与大数据融合应用成为创业新热点 新闻三&#xff1a;云计算与边缘计算协同发展推动IT行业创新 认知和思考决定了你的赚钱能力。以下是今天可能引起你思考的热点新闻&#xff1a; 新闻…

python高级之简单爬虫实现

一、前言 场景1&#xff1a;一个网络爬虫&#xff0c;顺序爬取一个网页花了一个小时&#xff0c;采用并发下载就减少到了20分钟。 场景2&#xff1a;一个应用软件优化前每次打开网页需要3秒&#xff0c;采用异步并发提升到了200毫秒。 假设一个工程的工作量为100&#xff0c…

01_MinIO部署(Windows单节点部署/Docker化部署)

单节点-Windows环境安装部署 在Windows环境安装MinIO&#xff0c;主要包含两个东西&#xff1a; MinIO Server&#xff08;minio.exe&#xff09;&#xff1a;应用服务本身MinIO Client&#xff08;mc.exe&#xff09;&#xff1a;MinIO客户端工具&#xff08;mc&#xff09;…

数据分析24.11.13

Excel 函数 求和 函数 sum() sumif() SUMIF(range, criteria, [sum_range]) sumifs() average() count() max() min() 逻辑 函数 if() iferror() 查询函数 VLOOKUP()

已有docker增加端口号,不用重新创建Docker

已有docker增加端口号&#xff0c;不用重新创建Docker 1. 整体描述2. 具体实现2.1 查看容器id2.2 停止docker服务2.3 修改docker配置文件2.4 重启docker服务 3. 总结 1. 整体描述 docker目前使用的非常多&#xff0c;但是每次更新都需要重新创建docker&#xff0c;也不太方便&…

java itext后端生成pdf导出

public CustomApiResult<String> exportPdf(HttpServletRequest request, HttpServletResponse response) throws IOException {// 防止日志记录获取session异常request.getSession();// 设置编码格式response.setContentType("application/pdf;charsetUTF-8")…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-04目录1. Alopex: A Computational Framework for Enabling On-Device Function Calls with LLMs摘要&#xff1a;研究背景&…

NLP论文速读(谷歌出品)|缩放LLM推理的自动化过程验证器

论文速读|Rewarding Progress: Scaling Automated Process Verifiers for LLM Reasoning 论文信息&#xff1a; 简介&#xff1a; 这篇论文探讨了如何提升大型语言模型&#xff08;LLM&#xff09;在多步推理任务中的性能。具体来说&#xff0c;它试图解决的问题是现有的基于结…

k-近邻算法(K-Nearest Neighbors, KNN)详解:机器学习中的经典算法

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

Debezium-MySqlConnectorTask

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 MySqlConnectorTask&#xff0c;用于读取MySQL的二进制日志并生成对应的数据变更事件 整体架构流程 技术名词解释 数据库模式&#xff08;Database Schema&#xff09; 数据库模式是指数据库中数据的组织结构和定义&…

SDF,一个从1978年运行至今的公共Unix Shell

关于SDF 最近发现了一个很古老的公共Unix Shell服务器&#xff0c;这个项目从1978年运行至今&#xff0c;如果对操作系统&#xff0c;对Unix感兴趣&#xff0c;可以进去玩一玩体验一下 SDF Public Access UNIX System - Free Shell Account and Shell Access 注册方式 我一…

逆向攻防世界CTF系列41-EASYHOOK

逆向攻防世界CTF系列41-EASYHOOK 看题目是一个Hook类型的&#xff0c;第一次接触&#xff0c;虽然学过相关理论&#xff0c;可以看我的文章 Hook入门(逆向)-CSDN博客 题解参考&#xff1a;https://www.cnblogs.com/c10udlnk/p/14214057.html和攻防世界逆向高手题之EASYHOOK-…

C# 面向对象

C# 面向对象编程 面向过程&#xff1a;一件事情分成多个步骤来完成。 把大象装进冰箱 (面向过程化设计思想)。走一步看一步。 1、打开冰箱门 2、把大象放进冰箱 3、关闭冰箱门 面向对象&#xff1a;以对象作为主体 把大象装进冰箱 1、抽取对象 大象 冰箱 门 &#xff0…

【AI图像生成网站Golang】项目架构

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与调试(等待更新) 四、项目架构 本项目的后端基于Golang和Gin框架开发&#xff0c;主要包括的模块有&#xff1a; backend/ ├── …

Acme PHP - Let‘s Encrypt

Lets Encrypt是一个于2015年三季度推出的数字证书认证机构&#xff0c;旨在以自动化流程消除手动创建和安装证书的复杂流程&#xff0c;并推广使万维网服务器的加密连接无所不在&#xff0c;为安全网站提供免费的SSL/TLS证书。 使用PHP来更新证书&#xff1a; Acme PHP | Rob…

前后端交互之动态列

一. 情景 在做项目时&#xff0c;有时候后会遇到后端使用了聚合函数&#xff0c;导致生成的对象的属性数量或数量不固定&#xff0c;因此无法建立一个与之对应的对象来向前端传递数据&#xff0c;这时可以采用NameDataListVO向前端传递数据。 Data Builder AllArgsConstructo…

【LeetCode 题】只出现一次的数字--其余数字都出现3次

&#x1f536;力扣上一道有意思的题&#xff0c;参考了评论区的解法&#xff0c;一起来学习 &#x1f354;思路说明&#xff1a; &#x1f31f;举例说明 &#xff1a; nums [2,2,3,2] 我们需要把其中的数字 ‘3’ 找出来 1️⃣把每个数都想成32位的二进制数&#xff08;这里举…

如何在 Ubuntu 上安装 Jupyter Notebook

本篇文章将教你在 Ubuntu 服务器上安装 Jupyter Notebook&#xff0c;并使用 Nginx 和 SSL 证书进行安全配置。 我将带你一步步在云服务器上搭建 Jupyter Notebook 服务器。Jupyter Notebook 在数据科学和机器学习领域被广泛用于交互式编码、可视化和实验。在远程服务器上运行…

一文了解Android的核心系统服务

在 Android 系统中&#xff0c;核心系统服务&#xff08;Core System Services&#xff09;是应用和系统功能正常运行的基石。它们负责提供系统级的资源和操作支持&#xff0c;包含了从启动设备、管理进程到提供应用基础组件的方方面面。以下是 Android 中一些重要的核心系统服…

学者观察 | 元计算、人工智能和Web 3.0——山东大学教授成秀珍

导语 成秀珍教授提出元计算是在开放的零信任环境下整合算力资源打通数据壁垒构建自进化智能的新质生产力技术&#xff0c;是一种新计算范式&#xff1b;区块链是Web3.0的核心技术之一&#xff0c;有助于保障开放零信任环境下&#xff0c;用户、设备和服务间去中心化数据流通的…