import pandas as pd
import matplotlib.pyplot as plt
import time
from pathlib import Path
import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
import argparse
def 设置日志():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def 处理文件(file_path, input_directory, output_directory):
try:
data1 = pd.read_csv(file_path)
nRow, nCol = data1.shape
index = 0
plt.figure()
for i in range(nRow - 1):
if data1.columns[i] == 'datetime':
continue
plt.subplot(nRow - 1, 1, index)
plt.plot(data1.iloc[:, i])
plt.title(data1.columns[i])
index += 1
formatted_time = time.strftime('%Y_%m_%d_%H_%M', time.localtime())
file_stem = file_path.stem.replace('@', '_').replace(':', '_')
output_path = Path(output_directory) / f"{formatted_time}_{file_stem}_3000.png"
plt.savefig(str(output_path))
logging.info(f"存储图片中... {output_path}")
plt.close()
return output_path
except Exception as e:
logging.error(f"保存图像时发生错误 {file_path}: {e}")
return None
def 动画图表(output_directory):
暂停 = False
def 按键按下(event):
nonlocal 暂停
if event.key == ' ': # 空格键切换暂停和继续
暂停 = not 暂停
print("Paused." if 暂停 else "Continuing...")
fig, ax = plt.subplots()
ax.axis('off') # 关闭坐标轴
plt.connect('key_press_event', 按键按下)
paths = sorted(Path(output_directory).glob('*.png'), key=lambda p: p.stat().st_mtime, reverse=True)[:2]
latest_paths = iter(paths)
while True:
try:
path = next(latest_paths)
print(f"正在显示文件: {path}")
img = plt.imread(str(path))
ax.imshow(img)
creation_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(path.stat().st_ctime))
ax.set_title(f'File Created at: {creation_time}')
plt.draw()
plt.pause(55 if 暂停 else 60) # 根据暂停状态设置持续时间
except StopIteration:
break # 如果没有更多图片,退出循环
except Exception as e:
logging.error(f"显示图片时发生错误 {path}: {e}")
break
plt.close()
def 主程序(input_directory, output_directory):
设置日志()
logging.info("-----开始运行-----")
sensor_dir = Path(input_directory)
files = list(sensor_dir.glob('*.csv'))[:210000] + list(sensor_dir.glob('*.csv'))[-210000:] # 只读取前210000笔数据和后210000笔数据
logging.info("即将进行文件读取...")
for i, f in enumerate(files, start=1):
logging.info(f"文件读取中 {i}/{len(files)}")
logging.info(f.name)
fig_dir = Path(output_directory)
if not fig_dir.exists():
try:
fig_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
logging.info("目录已存在。")
current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
logging.info(f"读取完毕,读取时间:{current_time} ^_^ 存储图片进行中...")
with ProcessPoolExecutor() as executor:
futures = [executor.submit(处理文件, file_path, input_directory, output_directory) for file_path in files]
for future in as_completed(futures):
result = future.result()
if result is not None:
logging.info(f"文件 {result} 处理完成")
else:
logging.warning("某个文件处理失败")
logging.info("存储完毕")
动画图表(output_directory)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Process CSV files and generate charts.")
parser.add_argument("--input", type=str, default='C:\\Users\\Administrator\\Desktop\\Sensor', help="Input directory containing CSV files.")
parser.add_argument("--output", type=str, default='C:\\Users\\Administrator\\Desktop\\Fig', help="Output directory to save generated images.")
args = parser.parse_args()
主程序(args.input, args.output)