利用亚马逊云科技Bedrock和LangChain开发AI驱动数据分析平台

项目简介:

小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。

本次介绍的是如何在亚马逊云科技上SageMaker上利用LangChain框架开发代码调用Amazon Titan大语言模型,基于业务数据库Schema、用户问题以及提示词模板生成LLM链,调用大模型托管服务Amazon Bedrock上的AI大语言模型,生成业务所需的SQL查询语句并利用SQL链查询数据并生成用户问题的答案。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:

方案所需基础知识  

什么是 Amazon Bedrock?

Amazon Bedrock 是亚马逊云科技推出的一项生成式 AI 服务,旨在帮助开发者轻松访问和部署各种强大的基础模型(Foundation Models),如文本生成、对话生成、图像生成等。通过 Amazon Bedrock,开发者可以快速构建、定制和扩展 AI 应用程序,而无需从零开始训练模型。这种服务使得利用大规模预训练模型变得更加简便和高效,适用于各种行业的 AI 应用场景。

什么是 Amazon SageMaker?

Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了全面的工具,从数据准备、模型训练到部署和监控,覆盖了机器学习项目的全生命周期。通过 SageMaker,用户可以加速机器学习模型的开发和上线,并确保模型在生产环境中的稳定性和性能。

什么是 Amazon Glue?

Amazon Glue 是亚马逊云科技的一项完全托管的数据集成服务,用于发现、准备和组合数据,以实现更快的数据分析。Glue 提供了数据提取、转换和加载(ETL)的功能,支持自动数据爬网、元数据管理和数据清理。它使得企业能够轻松将数据从多个源系统整合到数据湖或数据仓库中,为后续的分析和报告提供干净、统一的数据集。

利用大模型生成 SQL 语句的应用场景

自助式数据查询

利用大模型生成 SQL 语句,使非技术用户能够通过自然语言输入查询需求,自动生成相应的 SQL 语句进行数据库查询。这大大降低了数据访问的技术门槛,帮助用户快速获取所需的数据。

复杂查询自动化

在需要构建复杂查询时,开发者可以通过描述性语言让大模型生成 SQL 语句,避免手动编写长而复杂的查询代码。这不仅提高了开发效率,还减少了语法错误的可能性。

数据分析支持

分析师可以利用大模型生成 SQL 语句,根据业务问题自动生成分析查询,快速获取洞察,支持数据驱动的决策过程。

动态报告生成

在报表生成系统中,用户可以通过自然语言描述需要的报告内容,大模型将生成对应的 SQL 语句并自动提取数据,生成实时报告。这种应用使得报告生成更加灵活和高效

本方案包括的内容

1.申请Amazon Bedrock上的Amazon Titan大模型访问权限。

2.运行Amazon Glue数据提取服务,将数据库的Schema导入到Amazon Glue Data Catalog

3.根据Data Catalog利用LangChain生成一个动态提示词模板

4. 通过LangChain API调用Amazon Titan大模型

5. 测试调用大模型生成业务数据库SQL查询语句

项目搭建具体步骤:

1. 登录亚马逊云科技控制台,在Amazon Bedrock中启用Amazon Titan Text G1 - Lite模型,该模型的ID为:”amazon.titan-text-lite-v1“。

2. 下面进入亚马逊云科技SageMkaer服务Stuidio界面,点击Open打开Studio。

3. 创建一个新的Jupyter Notebook文件,命名为mda_text_to_sql_langchain_bedrock.ipynb。首先安装必要依赖,并导入。

%%capture!pip install sqlalchemy==2.0.29
!pip install langchain==0.1.19
!pip install langchain-experimental==0.0.58
!pip install PyAthena[SQLAlchemy]==3.8.2
!pip install -U langchain-aws==0.1.3import os
import json
import boto3import sqlalchemy
from sqlalchemy import create_enginefrom langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase,LLMChain
from langchain_experimental.sql import SQLDatabaseChain, SQLDatabaseSequentialChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplatefrom langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChainfrom typing import Dict
import time
from langchain_aws import BedrockLLM

4. 接下来我们把json中的业务数据导入到DataFrame中

import pandas as pd
library_df = pd.read_json('s3_library_data.json',lines=True)
library_df

5. 在开始该实验之前,我们通过CloudFormation基础设施即代码服务,创建了Amazon Glue和Catalog云资源。这时我们通过CloudFormation Stack的数据获取这些云资源的ID名字。

## Update with the correct Cloudformation Stack Name
cloudformation_stack_name = '<LabStack>'
cfn_client = boto3.client('cloudformation')def get_cfn_outputs(cloudformation_stack_name):outputs = {}for output in cfn_client.describe_stacks(StackName=cloudformation_stack_name)['Stacks'][0]['Outputs']:outputs[output['OutputKey']] = output['OutputValue']return outputsoutputs = get_cfn_outputs(cloudformation_stack_name)
json_formatted_str = json.dumps(outputs, indent=2)
print(json_formatted_str)

运行该代码得到的回复如下:

6. 我们继续在代码中指定我们的数据文件、Glue Crawler和Glue Database名字

# The following variables define the AWS Glue Data Catalog database and crawler to use.## DIY Note ##  You will be required to update some of the below variables as part of the DIY
data_file = 's3_library_data.json'
lab_files_folder = 'library-data'
glue_db_name = outputs['LibraryDatabaseName']
glue_crawler_name = outputs['LibraryCrawlerName']# The below variable does not need to be changed.
lab_files_bucket = outputs['LabfilesBucketName']print(lab_files_bucket)

7. 下一步我们通过亚马逊云科技Python SDK Boto3,开始运行一个Glue Crawler,在Glue中构建数据库的结构和元数据,供之后利用大模型查询使用。

client = boto3.client('glue')print("About to start running the crawler: ", glue_crawler_name)try:response = client.start_crawler(Name=glue_crawler_name )print("Successfully started crawler. The crawler may take 2-5 mins to detect the schema.")while True:# Get the crawler status.response = client.get_crawler(Name=glue_crawler_name)# Extract the crawler state.status = response['Crawler']['State']# Print the crawler status.print(f"Crawler '{glue_crawler_name}' status: {status}")if status == 'STOPPING':  # Replace 'READY' with the desired completed state.break  # Exit the loop if the desired state is reached.time.sleep(10)  # Sleep for 10 seconds before checking the status again.except:print("error in starting crawler. Check the logs for the error details.")

8. 接下来我们利用SQLAlchemy Python库与数据库建立连接,通过Python代码调用数据库进行数据操作。

# Define the Region.
region = boto3.session.Session().region_name## Athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443'                                         # Update, if port is different
schemaathena=glue_db_name                                # from user defined params
s3stagingathena=f's3://{lab_files_bucket}/athenaresults/' # from cfn params
wkgrpathena='primary'                                    # Update, if workgroup is different## Create the Athena connection string.
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"## Create the Athena SQLAlchemy engine.
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)gdc = [schemaathena]
print("Connection to Athena database succeeded: ", gdc[0])

9. 处理Glue Catalog的信息,将数据源数据库的库名、数据库每个表命、每个列命存入字符串变量”columns_str“中。

# Generate dynamic prompts to populate the AWS Glue Data Catalog.
# Harvest AWS Glue crawler metadata.def parse_catalog():# Connect to the Data Catalog.columns_str = 'database|table|column_name'# Define the AWS Glue cient.glue_client = boto3.client('glue')for db in gdc:response = glue_client.get_tables(DatabaseName =db)for tables in response['TableList']:# Classification in the response for S3 and other databases is different. Set classification based on the response location.if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3' else:  classification = tables['Parameters']['classification']for columns in tables['StorageDescriptor']['Columns']:dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']columns_str = columns_str+f'\n{dbname}|{tblname}|{colname}'return columns_strglue_catalog = parse_catalog()print(glue_catalog)

基于我们测试的数据,输出的”columns_str“变量如下。

10. 我们定义函数”identify_channel“,使用LangChain框架的LLMChain方法,通过定义提示词模板调用Amazon Bedrock大模型,获取基于用户问题的生成的SQL数据查询语句,并返回语句以及最佳数据查询方法(利用Athena查询S3中的数据,或利用API查询数据)。

# Define the Bedrock LLM model to use.bedrock_model = 'amazon.titan-text-lite-v1'BEDROCK_CLIENT = boto3.client("bedrock-runtime", 'us-east-1')# Amazon Titan does not require any inference modifiers.
if bedrock_model == 'amazon.titan-text-lite-v1':inference_modifier = {}
else:inference_modifier = {"temperature":0.0, "max_tokens":50}llm = BedrockLLM(model_id=bedrock_model, client=BEDROCK_CLIENT, model_kwargs = inference_modifier)def identify_channel(query):prompt_template_titan = """You are a SQL expert. Convert the below natural language question into a valid SQL statement. The schema has the structure below:\n"""+glue_catalog+""" \nHere is the question to be answered:\n{query}\nProvide the SQL query that would retrieve the data based on the natural language request.\n"""prompt_template = prompt_template_titan# print(prompt_template)## Define prompt 1.PROMPT_channel = PromptTemplate(template=prompt_template, input_variables=["query"])# Define the LLM chain.llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)# Run the query and save to generated texts.generated_texts = llm_chain.run(query)# Set the channel from where the query can be answered.if 's3' in generated_texts: channel='db'db=dbathenaprint("SET database to athena")elif 'api' in generated_texts: channel='api'print("SET database to weather api")        else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")# print("Step complete. Channel is: ", channel)return channel, db

11. 下面我们定义函数”run_query“,通过最优的数据查询方法,利用调用LangChain框架"SQLDatabaseChain.from_llm"方法,访问数据库执行我们刚生成的SQL语句。

def run_query(query):channel, db = identify_channel(query) # Call the identify channel function first._DEFAULT_TEMPLATE = """Here is a schema of a table:<schema>{table_info}</schema>       Run a SQL query to answer the question. Follow this format:SQLQuery: the correct SQL query. For example: select count ( * )  from s3_library_data where genre = 'Novel'SQLResult: the result of the SQL query.Answer: convert the SQLResult to a grammatically correct sentence.Here is question: {input}"""PROMPT_sql = PromptTemplate(input_variables=["table_info","input"], template=_DEFAULT_TEMPLATE)if channel=='db':db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT_sql, verbose=True, return_intermediate_steps=False)response=db_chain.run(query)else: raise Exception("Unlisted channel. Check your unified catalog")return response

12. 我们通过测试问题,调用”run_query“函数进行测试生成回复。

# query = """How many books with a genre of Fantasy are in the library?""" 
# query = """Find 3 books in the library with Tarzan in the title?""" 
# query = """How many books by author Stephen King are in the library?""" 
query = """How many total books are there in the library?""" response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response}')

我们得到大模型通过LangChain调用数据库后生成的SQL查询语句回复。 

以上就是在亚马逊云科技上利用亚马逊云科技上利用Amazon Bedrock上的大模型和LangChain开发AI驱动的数据平台的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/403289.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一次现网redis CPU使用率异常定位

背景 618大促前&#xff0c;运维对系统做巡检时发现redis cpu利用率白天基本保持在72%左右&#xff0c;夜里也在60%以上。担心618流量比平时大&#xff0c;导致redis超负荷&#xff0c;因此找开发进行优化&#xff0c;降低redis的负载。 定位思路 其实资源使用率过高定位都…

Taro+Vue 创建微信小程序

TaroVue 创建微信小程序 一、全局安装 tarojs/cli二、项目初始化三、现在去启动项目吧 一、全局安装 tarojs/cli npm install -g tarojs/cli //安装 npm info tarojs/cli //查看安装信息 如果正常显示版本说明成功了&#xff0c;就直接跳到第二步吧官网说&#xff1a;…

Unity引擎基础知识

目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能&#xff1f; Unity C#脚本语言的高级编…

【ML】Image Augmentation)的作用、使用方法及其分类

图像增强&#xff08;Image Augmentation&#xff09;的作用、使用方法及其分类 1. 图像增强的定义2. 图像增强的作用3. 什么时候使用图像增强&#xff1f;4. 图像增强详细方法分类梳理4.1 图像增强方法列表4.2 边界框增强方法5. 参考资料 yolov3&#xff08;一&#xff1a;模型…

K8S资源之PVPVC

概念 类似于Docker的数据卷挂载&#xff0c;将Pod中重要的文件挂载到宿主机上&#xff0c;如果Pod发生崩溃等情况自愈时&#xff0c;保证之前存储的数据没有丢失。 如上图中&#xff0c;将各个Pod中的目录挂载到存储层&#xff0c;如果Pod宕机后自愈均从存储层获取之前的数据…

00_remipi_软件评估记录

1.CPU 1.1 查看CPU信息命令 cat /proc/cpuinfo * processor: 系统中逻辑处理核心的编号&#xff0c;对于多核处理器则可以是物理核&#xff0c;或者使用超线程技术虚拟的逻辑核。 BogoMIPS: 在系统内核启动时粗略测算的CPU每秒运行百万条指令数&#xff08;Million Instruct…

Selenium 自动化测试平台

1.介绍 Selenium 是一套 Web网站 的程序自动化操作 解决方案。 通过它&#xff0c;我们可以写出自动化程序&#xff0c;像人一样在浏览器里操作web界面。 比如点击界面按钮&#xff0c;在文本框中输入文字 等操作。 而且还能从web界面获取信息。 比如获取 火车、汽车票务信息…

网络编程,网络协议,UDP协议

网络&#xff1a; 1.协议&#xff1a;通信双方约定的一套标准 2.国际网络通信协议标准&#xff1a; 1.OSI协议&#xff1a; 应用层 发送的数据内容 表示层 数据是否加密 会话层 是否建立会话连接 传输层 …

mpls静态lsp实验

实验需求 R1、R2和R3之间已经部署了IGP协议&#xff0c;故192.168.10.0/24与192.168.20.0/24网络之间已经能够互访。现要求通过配置 静态LSP&#xff0c;使得这两个网络之间能基于MPLS进行互访&#xff0c;标签分配如图 组网图 实验思路 1、R1、R2和R3之间已经部署了IGP协议…

泰坦尼克号 - 从灾难中学习机器学习/Titanic - Machine Learning from Disaster(kaggle竞赛)第二集(加载数据)

此次目的&#xff1a; hello大家好&#xff0c;俺是没事爱瞎捣鼓又分享欲爆棚的叶同学&#xff01;&#xff01;&#xff01;准备出几期博客来记录我学习kaggle数据科学入门竞赛的过程&#xff0c;顺便也将其中所学习到的知识分享出来。这是第一集&#xff08;了解赛题&#x…

宝塔部署Django项目(华为云)

1、登录华为云&#xff1a; 2、点击远程登录&#xff1a; 3、打开宝塔网址&#xff08;华为云选的是centos&#xff09; 4、在华为终端复制指令点击运行&#xff1a; 会显示安装完成&#xff0c;出现一个页面记录一下&#xff0c;方便以后登录&#xff1a; 5、复制外网面板地…

【Linux线程】线程的深度解析(线程是什么?线程与进程区别是什么?)

目录 一、前言 二、 什么是线程 &#x1f4a7;线程的引入&#x1f4a7; &#x1f4a7;线程的基本概念 &#x1f4a7; &#x1f4a7;线程的理解 &#x1f4a7; &#x1f4a7;进程与线程的关系&#x1f4a7; &#x1f4a7;程序如何划分&#xff08;重拾页表、见一下LWP&…

基于springboot养老院管理系统pf

TOC springboot332基于springboot养老院管理系统pf 第1章 绪论 1.1选题动因 当前的网络技术&#xff0c;软件技术等都具备成熟的理论基础&#xff0c;市场上也出现各种技术开发的软件&#xff0c;这些软件都被用于各个领域&#xff0c;包括生活和工作的领域。随着电脑和笔记…

Python实战项目:天气数据爬取+数据可视化(完整代码)

一、选题的背景 随着人们对天气的关注逐渐增加&#xff0c;天气预报数据的获取与可视化成为了当今的热门话题&#xff0c;天气预报我们每天都会关注&#xff0c;天气情况会影响到我们日常的增减衣物、出行安排等。每天的气温、相对湿度、降水量以及风向风速是关注的焦点。通过…

实战OpenCV之图像显示

基础入门 OpenCV提供的功能非常多&#xff0c;图像显示是最基础也是最直观的一部分。它让我们能够直观地看到算法处理后的效果&#xff0c;对于调试和验证都至关重要。在OpenCV中&#xff0c;图像显示主要依赖于以下四个关键的数据结构和函数。 1、Mat类。这是OpenCV中最基本的…

LeetCode - LCR 146- 螺旋遍历二维数组

LCR 146题 题目描述&#xff1a; 给定一个二维数组 array&#xff0c;请返回「螺旋遍历」该数组的结果。 螺旋遍历&#xff1a;从左上角开始&#xff0c;按照 向右、向下、向左、向上 的顺序 依次 提取元素&#xff0c;然后再进入内部一层重复相同的步骤&#xff0c;直到提取完…

MySQL数据库入门,pycharm连接数据库—详细讲解

一.安装MySQL 1.常用MySQL5.7&#xff0c;首先安装MySQL&#xff0c; &#xff08;一&#xff09; &#xff08;二&#xff09; &#xff08;三&#xff09; &#xff08;四&#xff09; &#xff08;五&#xff09; 2.配置环境变量 打开MySQL安装路径&#xff0c;在其中找到…

ArcGis在线地图插件Maponline(好用版)

ArcGis加载插件&#xff0c;可在线浏览谷歌地图、天地图、高德地图、必应地图等多种&#xff0c;包含街道、影像、标注地图等信息&#xff08;谷歌地图需自备上网手段&#xff09;&#xff0c;免费注册账号即可使用&#xff0c;可加载无水印底图。 与大地2000坐标无需配准直接使…

洛杉物理服务器怎么样?

洛杉矶作为美国科技和互联网的重要中心&#xff0c;物理服务器的质量通常非常高&#xff0c;可以提供卓越的性能、强大的安全性、多样的配置选项和专业的服务支持。以下是对洛杉物理服务器的详细介绍。 1. 优质的性能 稳定的网络连接&#xff1a;洛杉矶物理服务器位于先进的数据…