import argparse
import logging
import sys
from pathlib import Path
import time
import os
import pandas as pd
import matplotlib.pyplot as plt
from concurrent.futures import ProcessPoolExecutor, as_completed
# 初始化日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('script.log'), logging.StreamHandler(sys.stdout)])
def process_file(file_path, input_directory, output_directory):
try:
# 使用 Pandas 读取 CSV 文件
data1 = pd.read_csv(file_path)
except Exception as e:
logging.error(f"读取文件 {file_path} 时发生错误: {e}")
return
# 获取数据的行数和列数
nRow, nCol = data1.shape
# 绘制图表
plt.figure(figsize=(10, 8))
index = 0
for i in range(nCol):
if data1.columns[i] == "datetime":
continue
plt.subplot(nCol - 1, 1, index + 1)
index += 1
plt.plot(data1.iloc[:, i])
plt.title(data1.columns[i])
# 获取当前时间并格式化
formatted_time = time.strftime('%Y_%m_%d_%H_%M', time.localtime())
# 修改文件名以包含当前时间,并替换特殊字符
file_stem = file_path.stem.replace('@', '_').replace(':', '_')
file_name = formatted_time + '_' + file_stem + '.png'
# 生成输出路径
output_path = Path(output_directory) / file_name
# 保存图像到 Fig 目录
try:
plt.savefig(str(output_path))
logging.info(f"存储图片中... {output_path}")
plt.close()
except Exception as e:
logging.error(f"保存图像 {output_path} 时发生错误: {e}")
return
return output_path
def animate_charts(output_directory):
# 获取最新生成的两个图表文件
paths = sorted(Path(output_directory).glob('*.png'), key=lambda p: p.stat().st_mtime, reverse=True)
latest_paths = paths[:2]
# 设置初始状态
paused = False
def on_key_press(event):
nonlocal paused
if event.key == ' ': # 空格键
paused = not paused
if paused:
print("Paused. Press space to continue.")
else:
print("Continuing...")
# 主循环
while True:
for path in latest_paths:
# 显示文件名
print(f"正在显示文件: {path}")
if paused:
plt.connect('key_press_event', on_key_press)
img = plt.imread(str(path))
fig, ax = plt.subplots()
ax.imshow(img)
ax.axis('off') # 关闭坐标轴
creation_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(path.stat().st_ctime))
ax.set_title(f'File Created at: {creation_time}')
plt.show(block=False)
# 显示倒计时
duration = 55
text_obj = None
for remaining in range(duration, 0, -1):
if not paused:
break
time.sleep(1)
if text_obj is not None:
text_obj.remove() # 直接移除文本对象
text_obj = ax.text(0.5, 0.15, f"Timing: {remaining}s", transform=ax.transAxes, ha='center', va='center', color='red', fontsize=8)
fig.canvas.draw_idle()
plt.pause(0.001) # 更新图形界面
plt.close()
while paused:
time.sleep(0.1) # 等待恢复
else:
img = plt.imread(str(path))
fig, ax = plt.subplots()
ax.imshow(img)
ax.axis('off') # 关闭坐标轴
creation_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(path.stat().st_ctime))
ax.set_title(f'File Created at: {creation_time}')
plt.show(block=False)
# 显示倒计时
duration = 60
text_obj = None
for remaining in range(duration, 0, -1):
time.sleep(1)
if text_obj is not None:
text_obj.remove() # 直接移除文本对象
text_obj = ax.text(0.5, 0.15, f"Timing: {remaining}s", transform=ax.transAxes, ha='center', va='center', color='red', fontsize=8)
fig.canvas.draw_idle()
plt.pause(0.001) # 更新图形界面
plt.close()
def main(input_directory, output_directory):
# 开始运行提示
logging.info("-----开始运行-----")
# 定义传感器数据所在的目录
sensor_dir = Path(input_directory)
# 获取传感器目录下的所有文件列表
files = list(sensor_dir.glob('*.csv'))
# 输出文件列表
logging.info("即将进行文件读取...")
for i, f in enumerate(files, start=1):
logging.info(f"文件读取中 {i}/{len(files)}")
logging.info(f.name)
# 定义输出图像的目录
fig_dir = Path(output_directory)
# 检查是否存在 Fig 目录,如果不存在则创建
if not fig_dir.exists():
try:
fig_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
logging.info("目录已存在。")
# 打印当前时间
current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
logging.info(f"读取完毕,读取时间: {current_time} ^_^ 存储图片进行中... ")
# 使用并行处理
with ProcessPoolExecutor() as executor:
futures = []
for file_path in files:
future = executor.submit(process_file, file_path, input_directory, output_directory)
futures.append(future)
# 等待所有任务完成
for future in as_completed(futures):
future.result()
# 结束提示
logging.info("存储完毕")
# 显示最近两张图片
animate_charts(output_directory)
if __name__ == "__main__":
# 定义默认配置值
input_directory = 'C:\\Users\\Administrator\\Desktop\\Sensor'
output_directory = 'C:\\Users\\Administrator\\Desktop\\Fig'
# 解析命令行参数
parser = argparse.ArgumentParser(description="Process CSV files and generate charts.")
parser.add_argument('--input_directory', type=str, default=input_directory, help='Input directory containing CSV files.')
parser.add_argument('--output_directory', type=str, default=output_directory, help='Output directory to save generated charts.')
args = parser.parse_args()
# 主函数调用
main(args.input_directory, args.output_directory)