- 本工具设计的初衷是用来获取微信账号的相关信息并解析PC版微信的数据库。
- 程序以 Python 语言开发,可读取、解密、还原微信数据库并帮助用户查看聊天记录,还可以将其聊天记录导出为csv、html等格式用于AI训练,自动回复或备份等等作用。下面我们将深入探讨这个工具的各个方面及其工作原理。
- 本项目仅供学习交流使用,严禁用于商业用途或非法途径,任何违反法律法规、侵犯他人合法权益的行为,均与本项目及其开发者无关,后果由行为人自行承担。
- 创作不易,请动动您发财的小手点点赞并收藏,您的支持是咱敲键盘的动力。
【完整演示工具下载】
https://download.csdn.net/download/qq_39190622/89958183https://download.csdn.net/download/qq_39190622/89958183
我们接着上一篇文章《劫持微信聊天记录并分析还原 —— 数据库结构讲解(四)》继续演示与讲解如何访问微信合并后的数据库并利用程序自带的网页UI来查看PC端微信的聊天记录,包括实时消息的获取等。
详细命令:
dbshow -merge "C:\Users\admin\AppData\Local\Temp\wxdb_all.db" -wid "C:\Users\admin\Documents\WeChat Files\wxid_b*************1"
- -merge 为指定合并后的微信数据库路径
- - wid 为微信用户帐号文件目录(用于显示图片)
运行命令后程序会自动打开网页UI。
此时我们点击“聊天查看”功能是无法看到聊天数据的,首次使用我们需要先将程序初始化设置。
依次点击右下角“更多设置” - “初始化设置” - “自动解密已登录微信”
选择已登录的微信(支持多个微信查看)
并耐心等待提示成功后(加载的时间根据数据量的大小而定),我们再次点击“聊天查看”功能。
此时所有该帐号下最近的聊天数据即可一一查看,但这些数据并不会实时显示,如果需要获取实时聊天数据,我们还需要手动点击聊天记录框右上角“实时消息”。
等待提示成功后,我们刷新一下页面即可查看实时聊天记录。
部分现实代码:
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py
# Description:
# Author: Rainbow
# Date: 2024/11/09
# -------------------------------------------------------------------------------
import os
import subprocess
import sys
import time
import uvicorn
import mimetypes
import logging
from logging.handlers import RotatingFileHandlerfrom uvicorn.config import LOGGING_CONFIG
from fastapi import FastAPI, Request, Path, Query
from fastapi.staticfiles import StaticFiles
from fastapi.exceptions import RequestValidationError
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse, FileResponsefrom .utils import gc, is_port_in_use, server_loger
from .rjson import ReJson
from .remote_server import rs_api
from .local_server import ls_apifrom pywxdump import __version__def gen_fastapi_app(handler):app = FastAPI(title="wxdump", description="微信工具", version=__version__,terms_of_service="https://www.chwm.vip",contact={"name": "Rainbow", "url": "https://www.chwm.vip"},license_info={"name": "MIT License","url": "https://www.chwm.vip"})web_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ui", "web") # web文件夹路径# 跨域origins = ["http://localhost:5000","http://127.0.0.1:5000","http://localhost:8080", # 开发环境的客户端地址"# "http://0.0.0.0:5000",# "*"]app.add_middleware(CORSMiddleware,allow_origins=origins, # 允许所有源allow_credentials=True,allow_methods=["*"], # 允许所有方法allow_headers=["*"], # 允许所有头)@app.on_event("startup")async def startup_event():logger = logging.getLogger("uvicorn")logger.addHandler(handler)# 错误处理@app.exception_handler(RequestValidationError)async def request_validation_exception_handler(request: Request, exc: RequestValidationError):# print(request.body)return ReJson(1002, {"detail": exc.errors()})# 首页@app.get("/")@app.get("/index.html")async def index():response = RedirectResponse(url="/s/index.html", status_code=307)return response# 路由挂载app.include_router(rs_api, prefix='/api/rs', tags=['远程api'])app.include_router(ls_api, prefix='/api/ls', tags=['本地api'])# 根据文件类型,设置mime_type,返回文件@app.get("/s/{filename:path}")async def serve_file(filename: str):# 构建完整的文件路径file_path = os.path.join(web_path, filename)file_path = os.path.abspath(file_path)# 检查文件是否存在if os.path.isfile(file_path):# 获取文件 MIME 类型mime_type, _ = mimetypes.guess_type(file_path)# 如果 MIME 类型为空,则默认为 application/octet-streamif mime_type is None:mime_type = "application/octet-stream"server_loger.warning(f"[+] 无法获取文件 MIME 类型,使用默认值:{mime_type}")if file_path.endswith(".js"):mime_type = "text/javascript"server_loger.info(f"[+] 文件 {file_path} MIME 类型:{mime_type}")# 返回文件return FileResponse(file_path, media_type=mime_type)# 如果文件不存在,返回 404return {"detail": "Not Found"}, 404# 静态文件挂载# if os.path.exists(os.path.join(web_path, "index.html")):# app.mount("/s", StaticFiles(directory=web_path), name="static")return appdef start_server(port=5000, online=False, debug=False, isopenBrowser=True,merge_path="", wx_path="", my_wxid="", ):"""启动flask:param port: 端口号:param online: 是否在线查看(局域网查看):param debug: 是否开启debug模式:param isopenBrowser: 是否自动打开浏览器:return:"""work_path = os.path.join(os.getcwd(), "wxdump_work") # 临时文件夹,用于存放图片等 # 全局变量if not os.path.exists(work_path):os.makedirs(work_path, exist_ok=True)server_loger.info(f"[+] 创建临时文件夹:{work_path}")print(f"[+] 创建临时文件夹:{work_path}")# 日志处理,写入到文件log_format = '[{levelname[0]}] {asctime} [{name}:{levelno}] {pathname}:{lineno} {message}'log_datefmt = '%Y-%m-%d %H:%M:%S'log_file_path = os.path.join(work_path, "wxdump.log")file_handler = RotatingFileHandler(log_file_path, mode="a", maxBytes=10 * 1024 * 1024, backupCount=3)formatter = logging.Formatter(fmt=log_format, datefmt=log_datefmt, style='{')file_handler.setFormatter(formatter)wx_core_logger = logging.getLogger("wx_core")db_prepare = logging.getLogger("db_prepare")# 这几个日志处理器为本项目的日志处理器server_loger.addHandler(file_handler)wx_core_logger.addHandler(file_handler)db_prepare.addHandler(file_handler)conf_file = os.path.join(work_path, "conf_auto.json") # 用于存放各种基础信息auto_setting = "auto_setting"env_file = os.path.join(work_path, ".env") # 用于存放环境变量# set 环境变量os.environ["PYWXDUMP_WORK_PATH"] = work_pathos.environ["PYWXDUMP_CONF_FILE"] = conf_fileos.environ["PYWXDUMP_AUTO_SETTING"] = auto_settingwith open(env_file, "w", encoding="utf-8") as f:f.write(f"PYWXDUMP_WORK_PATH = '{work_path}'\n")f.write(f"PYWXDUMP_CONF_FILE = '{conf_file}'\n")f.write(f"PYWXDUMP_AUTO_SETTING = '{auto_setting}'\n")if merge_path and os.path.exists(merge_path):my_wxid = my_wxid if my_wxid else "wxid_dbshow"gc.set_conf(my_wxid, "wxid", my_wxid) # 初始化wxidgc.set_conf(my_wxid, "merge_path", merge_path) # 初始化merge_pathgc.set_conf(my_wxid, "wx_path", wx_path) # 初始化wx_pathdb_config = {"key": my_wxid, "type": "sqlite", "path": merge_path}gc.set_conf(my_wxid, "db_config", db_config) # 初始化db_configgc.set_conf(auto_setting, "last", my_wxid) # 初始化last# 检查端口是否被占用if online:host = '0.0.0.0'else:host = "127.0.0.1"if is_port_in_use(host, port):server_loger.error(f"Port {port} is already in use. Choose a different port.")print(f"Port {port} is already in use. Choose a different port.")input("Press Enter to exit...")return # 退出程序if isopenBrowser:try:# 自动打开浏览器url = f"http://127.0.0.1:{port}/"# 根据操作系统使用不同的命令打开默认浏览器if sys.platform.startswith('darwin'): # macOSsubprocess.call(['open', url])elif sys.platform.startswith('win'): # Windowssubprocess.call(['start', url], shell=True)elif sys.platform.startswith('linux'): # Linuxsubprocess.call(['xdg-open', url])else:server_loger.error(f"Unsupported platform, can't open browser automatically.", exc_info=True)print("Unsupported platform, can't open browser automatically.")except Exception as e:server_loger.error(f"自动打开浏览器失败:{e}", exc_info=True)time.sleep(1)server_loger.info(f"启动 Web 服务,host:port:{host}:{port}")print("[+] 请使用浏览器访问 http://127.0.0.1:5000/ 查看聊天记录")global appprint("[+] 如需查看api文档,请访问 http://127.0.0.1:5000/docs ")app = gen_fastapi_app(file_handler)LOGGING_CONFIG["formatters"]["default"]["fmt"] = "[%(asctime)s] %(levelprefix)s %(message)s"LOGGING_CONFIG["formatters"]["access"]["fmt"] = '[%(asctime)s] %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'uvicorn.run(app=app, host=host, port=port, reload=debug, log_level="info", workers=1, env_file=env_file)app = None__all__ = ["start_server", "gen_fastapi_app"]