1 分钟能做什么?集成 ChatGPT 到自己的公众号,小程序或者 APP?集成各种形式的 Stable Diffusion,让 AIGC 帮助自己的项目更有趣,更生动?本教程将会教大家如何 1 分钟高效集成 ChatGPT,Stable Diffusion 等 AIGC 模型到自己的项目中。(文末有视频和代码案例)
#1 分钟集成 ChatGPT
1. 注册并登录 https://aigcaas.cn网站
2. 点击左侧 API Explorer,并选择分类“聊天机器人”,选择应用 ChatGPT,并选择Chat Completion:
3. 在中间,按照提示的格式,输入对应的参数信息,例如:[{"role":"user","content":"你好"}],并点击右侧的在线调用→发送请求按钮:
稍等片刻,即可看到接口完成调用,已经获得请求结果。如果想要进行流式响应,可以在流式响应输入框中输入stream,例如:
4. 点击右侧的案例代码,根据自己需要获得对应的案例代码:
5. 此时,可以将代码复制到本地,对代码的第12行-13行进行修改:
点击页面左侧密钥列表→新建,创建密钥信息:
将密钥信息对应复制到代码12-13行:
6. 完成粘贴之后,即可进行代码的测试,例如执行代码(可能涉及到一些依赖的安装,不同语言安装依赖方式略有不同,可以自行安装依赖):
完成调试之后,即可和自己的项目做集成。当然也可以根据自己的需求对参数内容进行相对应的优化和调整。
#1 分钟获取 Stable Diffusion
Stable Diffusion 是目前非常火热的文生图工具,根据文字即可生成图片, AIGCaaS 平台拥有数十款 Stable Diffusion 应用,例如以哈士奇作为关键词,进行不同类型,不同风格的图像生成,效果如下:
接口获取的方式与 ChatGPT 基本一致:
1. 点击左侧 API Explorer,并选择分类“图像生成”,选择一个自己需要的场景,例如“Open Journey”:
2. 输入提示词,例如:1girl, white hair, golden eyes, beautiful eyes, detail, flower meadow, cumulonimbus clouds, lighting, detailed sky, garden,并点击发送请求按钮:
多次生成,超看效果:
当然,不同模型的表现可能不同,例如二次元生成的效果:
3. 完成图片的生成,可以点击右侧的案例代码,根据自己需要获得对应的案例代码:
4. 同样要对13-14行代码进行密钥信息的替换,替换完成即可在本地进行代码的运行:此时,通过返回的链接地址,下载数据即可:
#如何与微信公众号进行集成
以 Python 语言为例,通过 Flask 框架作为后端框架:
# coding:utf-8
import os
import time
import json
import redis
import random
import base64
import hashlib
import logging
import requests
import xmltodict
import urllib.parse
import urllib.request
from flask import Flask, request, abort, Response, render_template_string# 获取随机字符串
random_str = lambda count=100: "".join(random.sample('zyxwvutsrqponmlkjihgUIOPLKJHGFDSAZXCVBNM' * 10, count))# 日志配置
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 常量
WECHAT_TOKEN = "公众号 Token"
WECHAT_APPID = "公众号 APPID"
WECHAT_APPSECRET = "公众号 APP Secret"
WECHAT_ENCODINGAESKEY = '公众号 Encoding AES Key'
secret_id = 'AIGCaaS 平台密钥' # 密钥信息
secret_key = 'AIGCaaS 平台密钥' # 密钥信息db_config = {"redis": {"server": "192.168.211.1","port": 6379,}
}app = Flask(__name__)redis_conn = redis.Redis(host=db_config["redis"]["server"], port=db_config["redis"]["port"])# 签名对象
getSha256 = lambda content: hashlib.sha256(content.encode("utf-8")).hexdigest()getTextResponse = lambda xml_dict, response_data: {"xml":{"ToUserName": xml_dict.get("FromUserName"),"FromUserName": xml_dict.get("ToUserName"),"CreateTime": int(time.time()),"MsgType": "text","Content": response_data}
}def chatgpt(user, content):try:messages = redis_conn.get('user-%s' % (user))messages = json.loads(messages.decode("utf-8")) if messages else []messages.append({"role": "user", "content": content})redis_conn.setex('user-%s' % (user), 60 * 60 * 24 * 7, json.dumps(messages))url = "https://api.aigcaas.cn/product/%s/api/%s" % ('chatgpt_chat', 'chat_com')# 构建请求头nonce = str(random.randint(1, 10000))timestamp = str(int(time.time()))headers = {'SecretID': secret_id,'Nonce': nonce,'Token': getSha256(("%s%s%s" % (timestamp, secret_key, nonce))),'Timestamp': timestamp,'Content-Type': 'application/json'}# 构建请求 bodydata = {"messages": messages}# 获取响应response = requests.request("POST", url, headers=headers, data=json.dumps(data))result = json.loads(response.text)logger.debug("result: %s" % result)if result.get("status") == "Error":redis_conn.delete('user-%s' % (user))redis_conn.setex('user-last-text-message-%s' % user, 600, "系统检测到敏感信息,已经屏蔽,请重试")else:messages.append(result["choices"][0]["message"])messages = messages[-10:]redis_conn.setex('user-%s' % (user), 60 * 60 * 24, json.dumps(messages))return result["choices"][0]["message"]["content"]except Exception as e:logger.error(e)return e@app.route("/mp", methods=["GET", "POST"])
def wechat():# 接收微信服务器发送参数signature = request.args.get("signature")timestamp = request.args.get("timestamp")nonce = request.args.get("nonce")logger.debug([signature, timestamp, nonce])if not all([signature, timestamp, nonce]):abort(400)li = [WECHAT_TOKEN, timestamp, nonce]li.sort()tmp_str = "".join(li)sign = hashlib.sha1(tmp_str.encode("utf-8")).hexdigest() # 进行sha1加密, 得到正确的签名值# 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信if signature != sign:abort(403)if request.method == "GET":return request.args.get("echostr")xml_str = request.dataif not xml_str:abort(400)# 对xml字符串进行解析成字典xml_dict = xmltodict.parse(xml_str)xml_dict = xml_dict.get("xml")logger.debug('xml_str: %s' % xml_str)logger.debug('xml_dict: %s' % xml_dict)user = xml_dict.get("FromUserName")# 新的信息处理resp_dict = getTextResponse(xml_dict, "感谢您关注 AIGC Hub")if xml_dict.get("MsgType") == "text":response_data = chatgpt(user, xml_dict.get("Content"))resp_dict = getTextResponse(xml_dict, response_data)resp_xml_str = xmltodict.unparse(resp_dict)logger.debug(resp_xml_str)return resp_xml_strif __name__ == '__main__':app.run(host='0.0.0.0', port=8000, debug=True, threaded=True)
整个项目非常简单:服务端接收到请求,将文字信息提取出来xml_dict.get("Content")
,并通过chatgpt方法进行结果的获取,并将结果返回给用户。但是这里遇到一个比较尴尬的事情,因为 AIGC 模型往往运行时间会比较长,可能超过微信公众号的 5 秒钟要求,所以此处可以进行额外的处理:
1. 服务端接收到请求,将文字信息提取出来
xml_dict.get("Content")
2. 异步调用
chatgpt
方法,如果 5 秒钟没有响应,则返回一个友好的提示:系统正在进行处理,请稍后回复"继续"查看结果,如果获得到了结果,则进行结果的返回3. 如果没有拿到结果,用户稍后发送继续时,系统将结果返回给客户端
整体的实现逻辑为:
# coding:utf-8
import os
import time
import json
import redis
import random
import base64
import hashlib
import logging
import requests
import xmltodict
import urllib.parse
import urllib.request
from flask import Flask, request, abort, Response, render_template_string
from concurrent.futures import ThreadPoolExecutor# 创建线程池执行器
executor = ThreadPoolExecutor(2)# 获取随机字符串
random_str = lambda count=100: "".join(random.sample('zyxwvutsrqponmlkjihgUIOPLKJHGFDSAZXCVBNM' * 10, count))# 日志配置
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 常量
WECHAT_TOKEN = "公众号 Token"
WECHAT_APPID = "公众号 APPID"
WECHAT_APPSECRET = "公众号 APP Secret"
WECHAT_ENCODINGAESKEY = '公众号 Encoding AES Key'
secret_id = 'AIGCaaS 平台密钥' # 密钥信息
secret_key = 'AIGCaaS 平台密钥' # 密钥信息db_config = {"redis": {"server": "192.168.211.1","port": 6379,}
}app = Flask(__name__)redis_conn = redis.Redis(host=db_config["redis"]["server"], port=db_config["redis"]["port"])# 签名对象
getSha256 = lambda content: hashlib.sha256(content.encode("utf-8")).hexdigest()getTextResponse = lambda xml_dict, response_data: {"xml":{"ToUserName": xml_dict.get("FromUserName"),"FromUserName": xml_dict.get("ToUserName"),"CreateTime": int(time.time()),"MsgType": "text","Content": response_data}
}def chatgpt(user, content):try:messages = redis_conn.get('user-%s' % (user))messages = json.loads(messages.decode("utf-8")) if messages else []messages.append({"role": "user", "content": content})redis_conn.setex('user-%s' % (user), 60 * 60 * 24 * 7, json.dumps(messages))url = "https://api.aigcaas.cn/product/%s/api/%s" % ('chatgpt_chat', 'chat_com')# 构建请求头nonce = str(random.randint(1, 10000))timestamp = str(int(time.time()))headers = {'SecretID': secret_id,'Nonce': nonce,'Token': getSha256(("%s%s%s" % (timestamp, secret_key, nonce))),'Timestamp': timestamp,'Content-Type': 'application/json'}# 构建请求 bodydata = {"messages": messages}# 获取响应response = requests.request("POST", url, headers=headers, data=json.dumps(data))result = json.loads(response.text)logger.debug("result: %s" % result)if result.get("status") == "Error":redis_conn.delete('user-%s' % (user))redis_conn.setex('user-last-text-message-%s' % user, 600, "系统检测到敏感信息,已经屏蔽,请重试")else:messages.append(result["choices"][0]["message"])messages = messages[-10:]redis_conn.setex('user-%s' % (user), 60 * 60 * 24, json.dumps(messages))redis_conn.setex('user-last-text-message-%s' % user, 600, result["choices"][0]["message"]["content"])except Exception as e:logger.error(e)redis_conn.delete('user-%s' % (user))def get_access_token():url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s' % (WECHAT_APPID, WECHAT_APPSECRET)response_data = urllib.request.urlopen(url).read().decode("utf-8")logger.debug("response_data: %s" % response_data)return json.loads(response_data)["access_token"]@app.route("/mp", methods=["GET", "POST"])
def wechat():# 接收微信服务器发送参数signature = request.args.get("signature")timestamp = request.args.get("timestamp")nonce = request.args.get("nonce")logger.debug([signature, timestamp, nonce])if not all([signature, timestamp, nonce]):abort(400)li = [WECHAT_TOKEN, timestamp, nonce]li.sort()tmp_str = "".join(li)sign = hashlib.sha1(tmp_str.encode("utf-8")).hexdigest() # 进行sha1加密, 得到正确的签名值# 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信if signature != sign:abort(403)if request.method == "GET":return request.args.get("echostr")xml_str = request.dataif not xml_str:abort(400)# 对xml字符串进行解析成字典xml_dict = xmltodict.parse(xml_str)xml_dict = xml_dict.get("xml")logger.debug('xml_str: %s' % xml_str)logger.debug('xml_dict: %s' % xml_dict)user = xml_dict.get("FromUserName")# 对超时信息进行处理resp_dict = Noneif xml_dict.get("MsgType") == "text" and '继续' == xml_dict.get("Content").strip():resp_dict = getTextResponse(xml_dict, '系统还在处理中,烦请再等一下发送"继续"查看结果')text_status = redis_conn.get('user-last-text-message-status-%s' % user)if text_status:response_data = redis_conn.get('user-last-text-message-%s' % user)if response_data:redis_conn.delete('user-last-text-message-status-%s' % user)redis_conn.delete('user-last-text-message-%s' % user)resp_dict = getTextResponse(xml_dict, response_data.decode("utf-8"))else:resp_dict = None# 新的信息处理if not resp_dict:redis_conn.delete('user-last-text-message-status-%s' % user)redis_conn.delete('user-last-text-message-%s' % user)# 配置默认信息resp_dict = getTextResponse(xml_dict, "感谢您关注 AIGC Hub")if xml_dict.get("MsgType") == "text":redis_conn.setex('user-last-text-message-status-%s' % user, 600, '1')executor.submit(chatgpt, user, xml_dict.get("Content"))response_data = '系统正在进行处理,请稍后回复"继续"查看结果'for i in range(1, 23):time.sleep(0.2)temp_response_data = redis_conn.get('user-last-text-message-%s' % user)if temp_response_data:response_data = temp_response_data.decode("utf-8")redis_conn.delete('user-last-text-message-status-%s' % user)breakresp_dict = getTextResponse(xml_dict, response_data)resp_xml_str = xmltodict.unparse(resp_dict)logger.debug(resp_xml_str)return resp_xml_strif __name__ == '__main__':app.run(host='0.0.0.0', port=8000, debug=True, threaded=True)
#总结
上面为大家介绍了如何用 1 分钟获取多种形式的 AIGC 应用,包括不限于 ChatGPT,Stable Diffusion 等,并通过与微信公众号集成的方式,为大家进行了简单的代码分享。当然,除了与公众号的集成还可以与小程序的集成,还可以与其他更多种形式的应用进行集成,AIGCaaS 平台系统为开发者提供开箱即用的 AIGC 应用 API,可以让开发者更简单,更方便,更快速的使用多种形式的 AIGC 应用。
常见问题:
1. 为什么我生成的 Stable Diffusion 图片是黑色的?在某些时候,系统进行了敏感检测,如果生成的图片存在敏感信息,系统会自动进行屏蔽;
2. 这个 AIGCaaS 项目收费么?AIGCaaS 为开发者提供了大量的低价 API,注册即赠送大量的试用与体验机会(以 ChatGPT 为例,可以体验大概2500次),即便超过免费体验此时,开发者也可以申请永久免费计划,进行长期的“薅羊毛”;
3. 使用过程中出现问题,如何联系技术支持?技术支持微信号:
envless
,有任何问题随时联系;
AIGC 爱好者交流群
------
我们创建了一个高质量的技术交流群,与优秀的人在一起,自己也会优秀起来,赶紧点击加群,享受一起成长的快乐。另外,如果你最近想跳槽的话,年前我花了2周时间收集了一波大厂面经,节后准备跳槽的可以点击这里领取!
推荐阅读
谷歌打响全面反击战:AI重构搜索、新模型比肩GPT-4
Angular 16 正式发布,抢先体验指南
关于并发编程与线程安全的思考与实践
··································
你好,我是程序猿DD,10年开发老司机、阿里云MVP、腾讯云TVP、出过书创过业、国企4年互联网6年。从普通开发到架构师、再到合伙人。一路过来,给我最深的感受就是一定要不断学习并关注前沿。只要你能坚持下来,多思考、少抱怨、勤动手,就很容易实现弯道超车!所以,不要问我现在干什么是否来得及。如果你看好一个事情,一定是坚持了才能看到希望,而不是看到希望才去坚持。相信我,只要坚持下来,你一定比现在更好!如果你还没什么方向,可以先关注我,这里会经常分享一些前沿资讯,帮你积累弯道超车的资本。