Chainlit快速实现AI对话应用将聊天记录的持久化到MySql关系数据库中

概述

默认情况下,Chainlit 应用不会保留其生成的聊天和元素。即网页一刷新,所有的聊天记录,页面上的所有聊天记录都会消失。但是,存储和利用这些数据的能力可能是您的项目或组织的重要组成部分。

之前写过一篇文章《Chainlit快速实现AI对话应用并将聊天数据的持久化到sqllite本地数据库中》,这个技术方案的优点是,不需要自己在安装数据库,创建表结构等操作,缺点是,只适合用户量比较少的情况。使用mysql数据库可以解决中等规模的用户访问聊天记录访问问题。

教程

1. 安装chainlit依赖

pip install chainlit aiomysql pymysql cryptography sqlalchemy
  • aiomysql 异步mysql驱动
  • pymysql 同步mysql驱动
  • sqlalchemy SQL 工具包及对象关系映射(ORM)工具
  • cryptography 是一个用于Python的开源软件包,旨在提供一套易于使用的加密工具和算法
  • chainlit 是一个开源框架,用于快速构建和部署对话式应用,如聊天机器人和虚拟助手。

2. 配置环境变量

在项目根目录下,创建.env文件,内容如下:

OPENAI_BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
OPENAI_API_KEY="your api_key"
  • 由于国内无法访问open aichatgpt,所以需要配置 OPENAI_BASE_URL的代理地址,如果使用国内的LLM大模型接口,可以使用兼容open ai的接口地址

安装Mysql数据库

可以参考这篇文章 《MySQL 安装和配置教程 | MySQL入门》,或者自行百度如何安装。
安装mysql数据库后,使用navicat等数据管理工具,创建一个数据库,例如,名为chain_lit(或者其他名字都可以)的数据库,然后导入一下创建表结构的sql命令:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for elements
-- ----------------------------
DROP TABLE IF EXISTS `elements`;
CREATE TABLE `elements`  (`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`threadId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`chainlitKey` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`display` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`objectKey` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`size` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`page` int NULL DEFAULT NULL,`language` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`forId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`mime` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for feedbacks
-- ----------------------------
DROP TABLE IF EXISTS `feedbacks`;
CREATE TABLE `feedbacks`  (`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`forId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`threadId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`value` int NOT NULL,`comment` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for steps
-- ----------------------------
DROP TABLE IF EXISTS `steps`;
CREATE TABLE `steps`  (`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`threadId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`parentId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`disableFeedback` tinyint(1) NOT NULL DEFAULT 1,`streaming` tinyint(1) NOT NULL,`waitForAnswer` tinyint(1) NULL DEFAULT NULL,`isError` tinyint(1) NULL DEFAULT NULL,`metadata` json NULL,`tags` json NULL,`input` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`output` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`createdAt` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`start` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`end` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`generation` json NULL,`showInput` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,`language` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`indent` int NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for threads
-- ----------------------------
DROP TABLE IF EXISTS `threads`;
CREATE TABLE `threads`  (`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`createdAt` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`userId` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`userIdentifier` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,`tags` json NULL,`metadata` json NULL,PRIMARY KEY (`id`) USING BTREE,INDEX `userId`(`userId` ASC) USING BTREE,CONSTRAINT `threads_ibfk_1` FOREIGN KEY (`userId`) REFERENCES `users` (`id`) ON DELETE CASCADE ON UPDATE RESTRICT
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for users
-- ----------------------------
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users`  (`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`identifier` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`metadata` json NOT NULL,`createdAt` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `identifier`(`identifier` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

3. 创建代码

在项目根目录下,创建mysql_client.py文件,代码如下:
import pymysql
from aiomysql import connection
from chainlit.data import BaseStorageClient
from chainlit.logger import loggerclass MysqlStorageClient(BaseStorageClient):"""Class to enable storage in a MYSQL database.parms:host: Hostname or IP address of the MYSQL server.dbname: Name of the database to connect to.user: User name used to authenticate.password: Password used to authenticate.port: Port number to connect to (default: 3306)."""def __init__(self, host: str, dbname: str, user: str, password: str, port: int = 5432):try:self.conn: connection = pymysql.Connect(host=host,port=port,user=user,passwd=password,db=dbname,charset='utf8')logger.info("MysqlStorageClient initialized")except Exception as e:logger.warn(f"MysqlStorageClient initialization error: {e}")
在项目根目录下,创建mysql_data.py文件,代码如下:
import json
import ssl
import uuid
from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from literalai.helper import utc_nowimport aiofiles
import aiohttp
from chainlit.context import context
from chainlit.data import BaseDataLayer, BaseStorageClient, queue_until_user_message
from chainlit.logger import logger
from chainlit.step import StepDict
from chainlit.types import (Feedback,FeedbackDict,PageInfo,PaginatedResponse,Pagination,ThreadDict,ThreadFilter,
)
from chainlit.user import PersistedUser, User
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmakerif TYPE_CHECKING:from chainlit.element import Element, ElementDictfrom chainlit.step import StepDictclass MysqlDataLayer(BaseDataLayer):def __init__(self,conninfo: str,ssl_require: bool = False,storage_provider: Optional[BaseStorageClient] = None,user_thread_limit: Optional[int] = 1000,show_logger: Optional[bool] = False,):self._conninfo = conninfoself.user_thread_limit = user_thread_limitself.show_logger = show_loggerssl_args = {}if ssl_require:# Create an SSL context to require an SSL connectionssl_context = ssl.create_default_context()ssl_context.check_hostname = Falsessl_context.verify_mode = ssl.CERT_NONEssl_args["ssl"] = ssl_contextself.engine: AsyncEngine = create_async_engine(self._conninfo, connect_args=ssl_args)self.async_session = sessionmaker(bind=self.engine, expire_on_commit=False, class_=AsyncSession)  # type: ignoreif storage_provider:self.storage_provider: Optional[BaseStorageClient] = storage_providerif self.show_logger:logger.info("SQLAlchemyDataLayer storage client initialized")else:self.storage_provider = Nonelogger.warn("SQLAlchemyDataLayer storage client is not initialized and elements will not be persisted!")async def build_debug_url(self) -> str:return ""###### SQL Helpers ######async def execute_sql(self, query: str, parameters: dict) -> Union[List[Dict[str, Any]], int, None]:parameterized_query = text(query)async with self.async_session() as session:try:await session.begin()result = await session.execute(parameterized_query, parameters)await session.commit()if result.returns_rows:json_result = [dict(row._mapping) for row in result.fetchall()]clean_json_result = self.clean_result(json_result)return clean_json_resultelse:return result.rowcountexcept SQLAlchemyError as e:await session.rollback()logger.warn(f"An error occurred: {e}")return Noneexcept Exception as e:await session.rollback()logger.warn(f"An unexpected error occurred: {e}")return Noneasync def get_current_timestamp(self) -> str:return utc_now()def clean_result(self, obj):"""Recursively change UUID -> str and serialize dictionaries"""if isinstance(obj, dict):return {k: self.clean_result(v) for k, v in obj.items()}elif isinstance(obj, list):return [self.clean_result(item) for item in obj]elif isinstance(obj, uuid.UUID):return str(obj)return obj###### User ######async def get_user(self, identifier: str) -> Optional[PersistedUser]:if self.show_logger:logger.info(f"SQLAlchemy: get_user, identifier={identifier}")query = "SELECT * FROM users WHERE identifier = :identifier"parameters = {"identifier": identifier}result = await self.execute_sql(query=query, parameters=parameters)if result and isinstance(result, list):user_data = result[0]if isinstance(user_data['metadata'], str):user_data['metadata'] = json.loads(user_data['metadata'])print('get_user over')return PersistedUser(**user_data)return Noneasync def create_user(self, user: User) -> Optional[PersistedUser]:if self.show_logger:logger.info(f"SQLAlchemy: create_user, user_identifier={user.identifier}")existing_user: Optional["PersistedUser"] = await self.get_user(user.identifier)user_dict: Dict[str, Any] = {"identifier": str(user.identifier),"metadata": json.dumps(user.metadata) or {},}if not existing_user:  # create the userif self.show_logger:logger.info("SQLAlchemy: create_user, creating the user")user_dict["id"] = str(uuid.uuid4())user_dict["createdAt"] = await self.get_current_timestamp()query = "INSERT INTO users (`id`, `identifier`, `createdAt`, `metadata`) VALUES (:id, :identifier, :createdAt, :metadata)"await self.execute_sql(query=query, parameters=user_dict)else:  # update the userif self.show_logger:logger.info("SQLAlchemy: update user metadata")query = "UPDATE users SET metadata = :metadata WHERE identifier = :identifier"await self.execute_sql(query=query, parameters=user_dict)  # We want to update the metadatareturn await self.get_user(user.identifier)###### Threads ######async def get_thread_author(self, thread_id: str) -> str:print('get_thread_author', thread_id)if self.show_logger:logger.info(f"SQLAlchemy: get_thread_author, thread_id={thread_id}")query = """SELECT userIdentifier FROM threads WHERE id = :id"""parameters = {"id": thread_id}result = await self.execute_sql(query=query, parameters=parameters)print('result', result)if isinstance(result, list) and result:author_identifier = result[0].get("userIdentifier")if author_identifier is not None:return author_identifierraise ValueError(f"Author not found for thread_id {thread_id}")async def get_thread(self, thread_id: str) -> Optional[ThreadDict]:print('get_thread', thread_id)if self.show_logger:logger.info(f"SQLAlchemy: get_thread, thread_id={thread_id}")user_threads: Optional[List[ThreadDict]] = await self.get_all_user_threads(thread_id=thread_id)if user_threads:return user_threads[0]else:return Noneasync def update_thread(self,thread_id: str,name: Optional[str] = None,user_id: Optional[str] = None,metadata: Optional[Dict] = None,tags: Optional[List[str]] = None,):if self.show_logger:logger.info(f"SQLAlchemy: update_thread, thread_id={thread_id}")if context.session.user is not None:user_identifier = context.session.user.identifierelse:raise ValueError("User not found in session context")data = {"id": thread_id,"createdAt": (await self.get_current_timestamp() if metadata is None else None),"name": (nameif name is not Noneelse (metadata.get("name") if metadata and "name" in metadata else None)),"userId": user_id,"userIdentifier": user_identifier,"tags": tags,"metadata": json.dumps(metadata) if metadata else None,}parameters = {key: value for key, value in data.items() if value is not None}  # Remove keys with None valuescolumns = ", ".join(f'{key}' for key in parameters.keys())values = ", ".join(f":{key}" for key in parameters.keys())updates = ", ".join(f'{key} = VALUES({key})' for key in parameters.keys() if key != "id")query = f"""INSERT INTO threads ({columns})VALUES ({values})ON DUPLICATE KEY UPDATE{updates};"""await self.execute_sql(query=query, parameters=parameters)async def delete_thread(self, thread_id: str):if self.show_logger:logger.info(f"SQLAlchemy: delete_thread, thread_id={thread_id}")# Delete feedbacks/elements/steps/threadfeedbacks_query = "DELETE FROM feedbacks WHERE forId IN (SELECT id FROM steps WHERE threadId = :id)"elements_query = "DELETE FROM elements WHERE threadId = :id"steps_query = "DELETE FROM steps WHERE threadId = :id"thread_query = "DELETE FROM threads WHERE id = :id"parameters = {"id": thread_id}await self.execute_sql(query=feedbacks_query, parameters=parameters)await self.execute_sql(query=elements_query, parameters=parameters)await self.execute_sql(query=steps_query, parameters=parameters)await self.execute_sql(query=thread_query, parameters=parameters)async def list_threads(self, pagination: Pagination, filters: ThreadFilter) -> PaginatedResponse:if self.show_logger:logger.info(f"SQLAlchemy: list_threads, pagination={pagination}, filters={filters}")if not filters.userId:raise ValueError("userId is required")all_user_threads: List[ThreadDict] = (await self.get_all_user_threads(user_id=filters.userId) or [])search_keyword = filters.search.lower() if filters.search else Nonefeedback_value = int(filters.feedback) if filters.feedback else Nonefiltered_threads = []for thread in all_user_threads:keyword_match = Truefeedback_match = Trueif search_keyword or feedback_value is not None:if search_keyword:keyword_match = any(search_keyword in step["output"].lower()for step in thread["steps"]if "output" in step)if feedback_value is not None:feedback_match = False  # Assume no match until foundfor step in thread["steps"]:feedback = step.get("feedback")if feedback and feedback.get("value") == feedback_value:feedback_match = Truebreakif keyword_match and feedback_match:filtered_threads.append(thread)start = 0if pagination.cursor:for i, thread in enumerate(filtered_threads):if (thread["id"] == pagination.cursor):  # Find the start index using pagination.cursorstart = i + 1breakend = start + pagination.firstpaginated_threads = filtered_threads[start:end] or []has_next_page = len(filtered_threads) > endstart_cursor = paginated_threads[0]["id"] if paginated_threads else Noneend_cursor = paginated_threads[-1]["id"] if paginated_threads else Nonereturn PaginatedResponse(pageInfo=PageInfo(hasNextPage=has_next_page,startCursor=start_cursor,endCursor=end_cursor,),data=paginated_threads,)###### Steps ######@queue_until_user_message()async def create_step(self, step_dict: "StepDict"):if self.show_logger:logger.info(f"SQLAlchemy: create_step, step_id={step_dict.get('id')}")if not getattr(context.session.user, "id", None):raise ValueError("No authenticated user in context")step_dict["showInput"] = (str(step_dict.get("showInput", "")).lower()if "showInput" in step_dictelse None)parameters = {key: valuefor key, value in step_dict.items()if value is not None and not (isinstance(value, dict) and not value)}parameters["metadata"] = json.dumps(step_dict.get("metadata", {}))parameters["generation"] = json.dumps(step_dict.get("generation", {}))columns = ", ".join(f'{key}' for key in parameters.keys())values = ", ".join(f":{key}" for key in parameters.keys())updates = ", ".join(f'{key} = :{key}' for key in parameters.keys() if key != "id")query = f"""INSERT INTO steps ({columns})VALUES ({values})ON DUPLICATE KEY UPDATE{updates};"""await self.execute_sql(query=query, parameters=parameters)@queue_until_user_message()async def update_step(self, step_dict: "StepDict"):if self.show_logger:logger.info(f"SQLAlchemy: update_step, step_id={step_dict.get('id')}")await self.create_step(step_dict)@queue_until_user_message()async def delete_step(self, step_id: str):if self.show_logger:logger.info(f"SQLAlchemy: delete_step, step_id={step_id}")# Delete feedbacks/elements/stepsfeedbacks_query = "DELETE FROM feedbacks WHERE forId = :id"elements_query = "DELETE FROM elements WHERE forId = :id"steps_query = "DELETE FROM steps WHERE id = :id"parameters = {"id": step_id}await self.execute_sql(query=feedbacks_query, parameters=parameters)await self.execute_sql(query=elements_query, parameters=parameters)await self.execute_sql(query=steps_query, parameters=parameters)###### Feedback ######async def upsert_feedback(self, feedback: Feedback) -> str:if self.show_logger:logger.info(f"SQLAlchemy: upsert_feedback, feedback_id={feedback.id}")feedback.id = feedback.id or str(uuid.uuid4())feedback_dict = asdict(feedback)parameters = {key: value for key, value in feedback_dict.items() if value is not None}columns = ", ".join(f'{key}' for key in parameters.keys())values = ", ".join(f":{key}" for key in parameters.keys())updates = ", ".join(f'{key} = :{key}' for key in parameters.keys() if key != "id")query = f"""INSERT INTO feedbacks ({columns})VALUES ({values})ON DUPLICATE KEY UPDATE{updates};"""await self.execute_sql(query=query, parameters=parameters)return feedback.idasync def delete_feedback(self, feedback_id: str) -> bool:if self.show_logger:logger.info(f"SQLAlchemy: delete_feedback, feedback_id={feedback_id}")query = "DELETE FROM feedbacks WHERE id = :feedback_id"parameters = {"feedback_id": feedback_id}await self.execute_sql(query=query, parameters=parameters)return True###### Elements ######@queue_until_user_message()async def create_element(self, element: "Element"):if self.show_logger:logger.info(f"SQLAlchemy: create_element, element_id = {element.id}")if not getattr(context.session.user, "id", None):raise ValueError("No authenticated user in context")if not self.storage_provider:logger.warn(f"SQLAlchemy: create_element error. No blob_storage_client is configured!")returnif not element.for_id:returncontent: Optional[Union[bytes, str]] = Noneif element.path:async with aiofiles.open(element.path, "rb") as f:content = await f.read()elif element.url:async with aiohttp.ClientSession() as session:async with session.get(element.url) as response:if response.status == 200:content = await response.read()else:content = Noneelif element.content:content = element.contentelse:raise ValueError("Element url, path or content must be provided")if content is None:raise ValueError("Content is None, cannot upload file")context_user = context.session.useruser_folder = getattr(context_user, "id", "unknown")file_object_key = f"{user_folder}/{element.id}" + (f"/{element.name}" if element.name else "")if not element.mime:element.mime = "application/octet-stream"uploaded_file = await self.storage_provider.upload_file(object_key=file_object_key, data=content, mime=element.mime, overwrite=True)if not uploaded_file:raise ValueError("SQLAlchemy Error: create_element, Failed to persist data in storage_provider")element_dict: ElementDict = element.to_dict()element_dict["url"] = uploaded_file.get("url")element_dict["objectKey"] = uploaded_file.get("object_key")element_dict_cleaned = {k: v for k, v in element_dict.items() if v is not None}columns = ", ".join(f'{column}' for column in element_dict_cleaned.keys())placeholders = ", ".join(f":{column}" for column in element_dict_cleaned.keys())query = f"INSERT INTO elements ({columns}) VALUES ({placeholders})"await self.execute_sql(query=query, parameters=element_dict_cleaned)@queue_until_user_message()async def delete_element(self, element_id: str, thread_id: Optional[str] = None):if self.show_logger:logger.info(f"SQLAlchemy: delete_element, element_id={element_id}")query = "DELETE FROM elements WHERE id = :id"parameters = {"id": element_id}await self.execute_sql(query=query, parameters=parameters)async def delete_user_session(self, id: str) -> bool:return False  # Not sure why documentation wants thisasync def get_all_user_threads(self, user_id: Optional[str] = None, thread_id: Optional[str] = None) -> Optional[List[ThreadDict]]:"""Fetch all user threads up to self.user_thread_limit, or one thread by id if thread_id is provided."""if self.show_logger:logger.info(f"SQLAlchemy: get_all_user_threads")user_threads_query = """SELECTid AS thread_id,createdAt AS thread_createdat,name AS thread_name,userId AS user_id,userIdentifier AS user_identifier,tags AS thread_tags,metadata AS thread_metadataFROM threadsWHERE userId = :user_id OR id = :thread_idORDER BY createdAt DESCLIMIT :limit"""user_threads = await self.execute_sql(query=user_threads_query,parameters={"user_id": user_id,"limit": self.user_thread_limit,"thread_id": thread_id,},)if not isinstance(user_threads, list):return Noneif not user_threads:return []else:thread_ids = ("('"+ "','".join(map(str, [thread["thread_id"] for thread in user_threads]))+ "')")steps_feedbacks_query = f"""SELECTs.id AS step_id,s.name AS step_name,s.type AS step_type,s.threadId AS step_threadid,s.parentId AS step_parentid,s.streaming AS step_streaming,s.waitForAnswer AS step_waitforanswer,s.isError AS step_iserror,s.metadata AS step_metadata,s.tags AS step_tags,s.input AS step_input,s.output AS step_output,s.createdAt AS step_createdat,s.start AS step_start,s.end AS step_end,s.generation AS step_generation,s.showInput AS step_showinput,s.language AS step_language,s.indent AS step_indent,f.value AS feedback_value,f.comment AS feedback_commentFROM steps s LEFT JOIN feedbacks f ON s.id = f.forIdWHERE s.threadId IN {thread_ids}ORDER BY s.createdAt ASC"""steps_feedbacks = await self.execute_sql(query=steps_feedbacks_query, parameters={})elements_query = f"""SELECTe.id AS element_id,e.threadId as element_threadid,e.type AS element_type,e.chainlitKey AS element_chainlitkey,e.url AS element_url,e.objectKey as element_objectkey,e.name AS element_name,e.display AS element_display,e.size AS element_size,e.language AS element_language,e.page AS element_page,e.forId AS element_forid,e.mime AS element_mimeFROM elements eWHERE e.threadId IN {thread_ids}"""elements = await self.execute_sql(query=elements_query, parameters={})thread_dicts = {}for thread in user_threads:thread_id = thread["thread_id"]if thread_id is not None:if isinstance(thread['thread_metadata'], str):thread['thread_metadata'] = json.loads(thread['thread_metadata'])thread_dicts[thread_id] = ThreadDict(id=thread_id,createdAt=thread["thread_createdat"],name=thread["thread_name"],userId=thread["user_id"],userIdentifier=thread["user_identifier"],tags=thread["thread_tags"],metadata=thread["thread_metadata"],steps=[],elements=[],)# Process steps_feedbacks to populate the steps in the corresponding ThreadDictif isinstance(steps_feedbacks, list):for step_feedback in steps_feedbacks:thread_id = step_feedback["step_threadid"]if thread_id is not None:feedback = Noneif step_feedback["feedback_value"] is not None:feedback = FeedbackDict(forId=step_feedback["step_id"],id=step_feedback.get("feedback_id"),value=step_feedback["feedback_value"],comment=step_feedback.get("feedback_comment"),)step_dict = StepDict(id=step_feedback["step_id"],name=step_feedback["step_name"],type=step_feedback["step_type"],threadId=thread_id,parentId=step_feedback.get("step_parentid"),streaming=step_feedback.get("step_streaming", False),waitForAnswer=step_feedback.get("step_waitforanswer"),isError=step_feedback.get("step_iserror"),metadata=(step_feedback["step_metadata"]if step_feedback.get("step_metadata") is not Noneelse {}),tags=step_feedback.get("step_tags"),input=(step_feedback.get("step_input", "")if step_feedback["step_showinput"] == "true"else None),output=step_feedback.get("step_output", ""),createdAt=step_feedback.get("step_createdat"),start=step_feedback.get("step_start"),end=step_feedback.get("step_end"),generation=step_feedback.get("step_generation"),showInput=step_feedback.get("step_showinput"),language=step_feedback.get("step_language"),indent=step_feedback.get("step_indent"),feedback=feedback,)# Append the step to the steps list of the corresponding ThreadDictthread_dicts[thread_id]["steps"].append(step_dict)if isinstance(elements, list):for element in elements:thread_id = element["element_threadid"]if thread_id is not None:element_dict = ElementDict(id=element["element_id"],threadId=thread_id,type=element["element_type"],chainlitKey=element.get("element_chainlitkey"),url=element.get("element_url"),objectKey=element.get("element_objectkey"),name=element["element_name"],display=element["element_display"],size=element.get("element_size"),language=element.get("element_language"),autoPlay=element.get("element_autoPlay"),playerConfig=element.get("element_playerconfig"),page=element.get("element_page"),forId=element.get("element_forid"),mime=element.get("element_mime"),)thread_dicts[thread_id]["elements"].append(element_dict)  # type: ignorereturn list(thread_dicts.values())
在项目根目录下,创建一个app.py的文件,代码如下:
from typing import List, Optionalimport chainlit as cl
import chainlit.data as cl_data
from openai import AsyncOpenAIfrom mysql_client import MysqlStorageClient
from mysql_data import MysqlDataLayerclient = AsyncOpenAI()thread_history = []  # type: List[cl_data.ThreadDict]
deleted_thread_ids = []  # type: List[str]storage_client = MysqlStorageClient(host="127.0.0.1",dbname="chain_lit",port=3306,user="root",password="123456")cl_data._data_layer = MysqlDataLayer(conninfo="mysql+aiomysql://root:123456@127.0.0.1:3306/chain_lit",storage_provider=storage_client)@cl.on_chat_start
async def main():content = "你好,我是泰山AI智能客服,有什么可以帮助您吗?"await cl.Message(content).send()@cl.on_message
async def handle_message():# Wait for queue to be flushedawait cl.sleep(1)msg = cl.Message(content="")await msg.send()stream = await client.chat.completions.create(model="qwen-turbo", messages=cl.chat_context.to_openai(), stream=True)async for part in stream:if token := part.choices[0].delta.content or "":await msg.stream_token(token)await msg.update()@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:if (username, password) == ("admin", "admin"):return cl.User(identifier="admin")else:return None@cl.on_chat_resume
async def on_chat_resume():pass
  • 将代码中关于mysql数据库连接信息,修改为自己的即可。

4. 执行命令创建 AUTH_SECRET 鉴权

chainlit create-secret 

在这里插入图片描述
复制最后一行代码到.env环境配置文件中

CHAINLIT_AUTH_SECRET="$b?/v0NeJlAU~I5As1WSCa,j8wJ3w%agTyIFlUt4408?mfC*,/wovlfA%3O/751U"
OPENAI_BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
OPENAI_API_KEY=""

5. 执行服务启动命令

chainlit run app.py -w

6. 启动后效果展示

在这里插入图片描述

  • 现在聊天记录都被保存在服务的mysql本地数据库中了,只要不重启服务,聊天记录就不会丢失了!

相关文章推荐

《使用 Xinference 部署本地模型》
《Fastgpt接入Whisper本地模型实现语音输入》
《Fastgpt部署和接入使用重排模型bge-reranker》
《Fastgpt部署接入 M3E和chatglm2-m3e文本向量模型》
《Fastgpt 无法启动或启动后无法正常使用的讨论(启动失败、用户未注册等问题这里)》
《vllm推理服务兼容openai服务API》
《vLLM模型推理引擎参数大全》
《解决vllm推理框架内在开启多显卡时报错问题》
《Ollama 在本地快速部署大型语言模型,可进行定制并创建属于您自己的模型》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474554.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【动手学深度学习Pytorch】6. LeNet实现代码

LeNet(LeNet-5)由两个部分组成:卷积编码器和全连接层密集块 x.view(): 对tensor进行reshape import torch from torch import nn from d2l import torch as d2lclass Reshape(torch.nn.Module):def forward(self, x):return x.view(-1, 1, 28…

AI工具百宝箱|任意选择与Chatgpt、gemini、Claude等主流模型聊天的Anychat,等你来体验!

文章推荐 AI工具百宝箱|使用Deep Live Cam,上传一张照片就可以实现实时视频换脸...简直太逆天! Anychat 这是一款可以与任何模型聊天 (chatgpt、gemini、perplexity、claude、metal llama、grok 等)的应用。 在页面…

Excel数据动态获取与映射

处理代码 动态映射 动态读取 excel 中的数据,并通过 json 配置 指定对应列的值映射到模板中的什么字段上 private void GetFreightFeeByExcel(string filePath) {// 文件名需要以快递公司命名 便于映射查询string fileName Path.GetFileNameWithoutExtension(fi…

SRP 实现 Cook-Torrance BRDF

写的很乱! BRDF(Bidirectional Reflectance Distribution Function)全称双向反射分布函数。辐射量单位非常多,这里为方便直观理解,会用非常不严谨的光照强度来解释说明。 BRDF光照模型,上反射率公式&#…

[代码随想录Day16打卡] 找树左下角的值 路径总和 从中序与后序遍历序列构造二叉树

找树左下角的值 定义:二叉树中最后一行最靠左侧的值。 前序,中序,后序遍历都是先遍历左然后遍历右。 因为优先遍历左节点,所以递归中因为深度增加更新result的时候,更新的值是当前深度最左侧的值,到最后就…

【第七节】在RadAsm中使用OllyDBG调试器

前言 接着本专栏上一节,我们虽然已经用上RadAsm进行编写x86汇编代码并编译运行,但是想进行断点调试怎么办。RadAsm里面找不到断点调试,下面我们来介绍如何在RadAsm上联合调试器OllyDBG进行调试代码。 OllyDBG的介绍与下载 OllyDBG 是一款功能…

WPF MVVM框架

一、MVVM简介 MVC Model View Control MVP MVVM即Model-View-ViewModel,MVVM模式与MVP(Model-View-Presenter)模式相似,主要目的是分离视图(View)和模型(Model),具有低…

PH热榜 | 2024-11-19

DevNow 是一个精简的开源技术博客项目模版,支持 Vercel 一键部署,支持评论、搜索等功能,欢迎大家体验。 在线预览 1. Layer 标语:受大脑启发的规划器 介绍:体验一下这款新一代的任务和项目管理系统吧!它…

【ArcGISPro】使用AI模型提取要素-提取车辆(目标识别)

示例数据下载 栅格数据从网上随便找一个带有车辆的栅格数据 f094a6b1e205cd4d30a2e0f816f0c6af.jpg (1200799) (588ku.com) 添加数据

联通光猫(烽火通信设备)改桥接教程

一、获得超级密码 1.打开telnet连接权限 http://192.168.1.1/telnet?enable1&key9070D3BECD70(MAC地址)2.连接光猫获取密码 telnet 192.168.1.1 用户名:admin 密码:Fh9070D3BECD70连接成功后 load_cli factory show admin_…

掌握SEO提升网站流量的关键在于长尾关键词的有效运用

内容概要 在现代数字营销中,搜索引擎优化(SEO)被广泛视为提升网站流量的核心策略之一,而其中长尾关键词的运用显得尤为重要。长尾关键词通常由三个或更多个词组成,具有更高的针对性和精确度,可以更好地满足…

【期权懂|个股期权中的备兑开仓策略是如何进行的?

期权小懂每日分享期权知识,帮助期权新手及时有效地掌握即市趋势与新资讯! 个股期权中的备兑开仓策略是如何进行的? 个股期权备兑开仓的优点和风险‌: ‌(1)优点‌:备兑开仓可以增强持股收益&…

汽车安全再进化 - SemiDrive X9HP 与环景影像系统 AVM 的系统整合

当今汽车工业正面临著前所未有的挑战与机遇,随著自动驾驶技术的迅速发展,汽车的安全性与性能需求日益提高。在这样的背景下,汽车 AVM(Automotive Visual Monitoring)标准应运而生,成为促进汽车智能化和安全…

MongoDB聚合操作

管道的聚合 管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。 MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理。管道操作是可以重复的。 表达式:处理输入文档并输出。表达式是无状态的,只能用…

向量数据库FAISS之五:原理(LSH、PQ、HNSW、IVF)

1.Locality Sensitive Hashing (LSH) 使用 Shingling MinHashing 进行查找 左侧是字典,右侧是 LSH。目的是把足够相似的索引放在同一个桶内。 LSH 有很多的版本,很灵活,这里先介绍第一个版本,也是原始版本 Shingling one-hot …

https(day30)

1.配置需要配置端口为443 2.配置需要配置证书 ssl_certificate /path/to/your/fullchain.pem; # 证书文件 ssl_certificate_key /path/to/your/private.key; # 私钥文件 3.其他优化

【WPF】Prism学习(七)

Prism Dependency Injection 1.注册类型(Registering Types) 1.1. Prism中的服务生命周期: Transient(瞬态):每次请求服务或类型时,都会获得一个新的实例。Singleton(单例&#xf…

服务器数据恢复—热备盘未激活导致硬盘掉线的raid5阵列崩溃的数据恢复案例

服务器数据恢复环境: 某品牌X3850服务器中有一组由数块SAS硬盘组建的RAID5阵列,该阵列中有一块盘是热备盘。操作系统为linux redhat,上面跑着一个基于oracle数据库的oa。 服务器故障: 服务器raid5阵列中有一块硬盘离线&#xff0…

ADS 2022软件下载与安装教程

“ 本文以最新的Advanced Design System 2022为例介绍ADS软件的安装及crack教程 ” ADS 简介 先进设计系统 Advanced Design system(ADS)Agilent Technologies 是领先的电子设计自动化软件,适用于射频、微波和信号完整性应用。ADS 是获得商…

Chrome 浏览器 131 版本新特性

Chrome 浏览器 131 版本新特性 一、Chrome 浏览器 131 版本更新 1. 在 iOS 上使用 Google Lens 搜索 自 Chrome 126 版本以来,用户可以通过 Google Lens 搜索屏幕上看到的任何图片或文字。 要使用此功能,请访问网站,并点击聚焦时出现在地…