系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、 通过计算指定区域的平方和判断所有目录中否有重复文件
- 1.需求
- 2.代码(整理版)
- 3.代码(原始版)
- 总结
前言
一、 通过计算指定区域的平方和判断所有目录中否有重复文件
1.需求
有成千上万的测试数据文件
查询文件中特定值对应的单元格位置
计算指定区域的平方和
生成字典,去重,判断是否有重复文件
2.代码(整理版)
#xlwt只支持xls格式,xlsx格式需要用openpyxl或pandas
import pandas as pd
import os
import xlrd
import xlwt
import csv
import openpyxl
import glob
from collections import Counter
from openpyxl import Workbook
from openpyxl import load_workbookglobal_var = None
global_var_r = Nonedef some_function():global global_varglobal global_var_r#定义读取csv_pandas
def read_csv_file(file_path):#参数:error_bad_lines=False跳过错误的行 delimiter=',',encoding = 'gbk',header = 0, engine='python' sep = r"\s+\s{0}" encoding = "iso-8859-1",error_bad_lines=Falsereturn pd.read_csv(file_path,encoding = 'latin1',sep = r"\s+\s{0}",dtype=object,quotechar="'",delimiter=',',doublequote=True,engine="python",usecols=lambda column: column != 0,header = row_heard) file_paths = []
def find_dir_name(path,file_name):file_names = os.listdir(path)file_dict = {} for file_name in file_names:#print('当前目录:%s' % path+'\\'+file_name)file_dict[file_name] = {} if os.path.isdir(path+'\\'+file_name):file_dict[file_name] = find_dir_name(path+'\\'+file_name,file_name)print('当前目录:%s' % path + '\\' + file_name)file = 'Data0_mtf.csv'full_path = os.path.join(path+'\\'+file_name,file)if os.path.exists(full_path):file_path = path+'\\'+file_name+'\\'+file file_paths.append(file_path)else:passelse:passreturn file_pathsdef search_str(filename,search_char):global global_var #将读取的函数计算结果设置为全局变量,我们可以先定义全局变量,然后在函数内部通过全局变量声明修改global global_var_rresult = []try:with open(filename,'r') as csvfile:csvreader = csv.reader(csvfile, delimiter=",") row_index = 0 for row in csvreader: col_index = 0for cell in row:if search_char == cell:result.append((row_index,col_index))for row1,col1 in result:global_var = col1global_var_r = row1 col_index += 1row_index += 1except:passdef get_top_nine_dirs(file_path):#分割路径并移除文件名parts = file_path.split(os.path.sep)#获取前9级目录#top_nine_dirs = os.path.sep.join(parts[:9])#获取第9级目录top_nine_dirs = os.path.sep.join(parts[8])print(top_nine_dirs)return top_nine_dirsdef has_duplicateds(d):values_counter = Counter(d.values())return any(count > 1 for count in values_counter.values())if __name__ == '__main__':#file_dir = input("请输入查重路径: \n")file_dir = r'\\10.99.10.142\6号楼信息恢复\6号楼--品保--勿删\抽检\2024年'data = find_dir_name(file_dir,'Data0_mtf.csv')square_sums = []fl_lst = list(filter(lambda x:x.find("Data0_mtf.csv")>=0,data)) i = 0for item in fl_lst:print(item)j = len(fl_lst)i = i + 1print('本次查询共'f'{j}''个文件,当前为第'f'{i}''个文件')position = search_str(item,'1S')row_heard = global_var_rcol_character = global_var df = read_csv_file(item)#print(df)col_character2 = col_character + 100row_num = len(df)print('发现共'f'{row_num}''行数据,''第'+f'{row_heard}'+'行作为标题行')col_num = df.shape[1]df.iloc[:row_num,col_character] = pd.to_numeric(df.iloc[:row_num,col_character],errors='coerce')df.iloc[:row_num,col_character2] = pd.to_numeric(df.iloc[:row_num,col_character2],errors='coerce')value = df.iloc[0,col_character]value1 = df.iloc[0,col_character2]value2 = df.iloc[1,col_character]value3 = df.iloc[1,col_character2]if value != 0 or value2 != 0:#print('1S列首行数据:' f'{value,value2}''计算平方和')print('正在进行数据分析……')elif value == 0 and value2 == 0:#print('51S列首行数据:' f'{value1,value3}''计算平方和')print('正在进行数据分析……')else:print("请检查数据是否正确")column_data1 = df.iloc[0:row_num,col_character]column_data2 = df.iloc[0:row_num,col_character2]#print(column_data1,' \n ',column_data2)square_sum = sum(map(lambda x: x**2,column_data1))square_sum2 = sum(map(lambda x: x**2,column_data2))if square_sum != 0:square_sums.append(square_sum)elif square_sum == 0:square_sums.append(square_sum2)else:print("计算错误")value_dict = dict(zip(data,square_sums))#print(value_dict)duplicate_values = {}for key, value in value_dict.items():if value in duplicate_values:duplicate_values[value].append(key)else:duplicate_values[value] = [key]duplicate_values = {k:v for k, v in duplicate_values.items() if len(v)>1}if has_duplicateds(value_dict):print("存在重复值")else:print("无重复值,请继续保持")df = pd.DataFrame()wb = Workbook()ws = wb.activews.title = 'Summary'n = 1for key,keys_list in duplicate_values.items():print(f"值{key}对应的文件有:{','.join(keys_list)}")keys = ws.cell(row = n,column = 1).value = keya = n + 1b = len(keys_list)for m in range(len(keys_list)): key_lists = ws.cell(row = a+m,column = 1).value = keys_list[m]n = a + b wb.save('output.xlsx')
3.代码(原始版)
#xlwt只支持xls格式,xlsx格式需要用openpyxl或pandas
import pandas as pd
import os
import xlrd
import xlwt
import csv
import openpyxl
import glob
from collections import Counter
from openpyxl import Workbook
from openpyxl import load_workbookglobal_var = None
global_var_r = Nonedef some_function():global global_varglobal global_var_r#定义读取csv_pandas
def read_csv_file(file_path):#参数:error_bad_lines=False跳过错误的行 delimiter=',',encoding = 'gbk',header = 0, engine='python' sep = r"\s+\s{0}" encoding = "iso-8859-1",error_bad_lines=Falsereturn pd.read_csv(file_path,encoding = 'latin1',sep = r"\s+\s{0}",dtype=object,quotechar="'",delimiter=',',doublequote=True,engine="python",usecols=lambda column: column != 0,header = row_heard) file_paths = []
def find_dir_name(path,file_name):file_names = os.listdir(path)file_dict = {} for file_name in file_names:#print('当前目录:%s' % path+'\\'+file_name)file_dict[file_name] = {} if os.path.isdir(path+'\\'+file_name):file_dict[file_name] = find_dir_name(path+'\\'+file_name,file_name)print('当前目录:%s' % path + '\\' + file_name)file = 'Data0_mtf.csv'full_path = os.path.join(path+'\\'+file_name,file)if os.path.exists(full_path):file_path = path+'\\'+file_name+'\\'+file file_paths.append(file_path)else:passelse:passreturn file_pathsdef search_str(filename,search_char):global global_var #将读取的函数计算结果设置为全局变量,我们可以先定义全局变量,然后在函数内部通过全局变量声明修改global global_var_rresult = []try:with open(filename,'r') as csvfile:csvreader = csv.reader(csvfile, delimiter=",") row_index = 0 for row in csvreader: col_index = 0for cell in row:if search_char == cell:result.append((row_index,col_index))#print(result)#print(f"Cell data:{cell},Row index:{row_index},Column index:{col_index}")#print('已查到该值',cell)for row1,col1 in result:#data_row = list(csvreader)#num_rows = len(data_row)#print(f'"{search_char}"在第{row1}行,第{col1}列被找到')#print(f'总行数:"{num_rows}"')global_var = col1global_var_r = row1 col_index += 1row_index += 1except:passdef get_top_nine_dirs(file_path):#分割路径并移除文件名parts = file_path.split(os.path.sep)#获取前9级目录#top_nine_dirs = os.path.sep.join(parts[:9])#获取第9级目录top_nine_dirs = os.path.sep.join(parts[8])print(top_nine_dirs)return top_nine_dirsdef has_duplicateds(d):values_counter = Counter(d.values())return any(count > 1 for count in values_counter.values())if __name__ == '__main__':#file_dir = input("请输入查重路径: \n")file_dir = r'\\10.99.10.142\6号楼信息恢复\6号楼--品保--勿删\抽检\2024年'#file_dir = r'D:\Users\gxcaoty\Desktop\性能覆盖率\data'#root,dirs,files = get_allfile_msg(file_dir)#allFile_url = get_allfile_url(root,files)data = find_dir_name(file_dir,'Data0_mtf.csv')square_sums = []fl_lst = list(filter(lambda x:x.find("Data0_mtf.csv")>=0,data)) i = 0for item in fl_lst:print(item)j = len(fl_lst)i = i + 1print('本次查询共'f'{j}''个文件,当前为第'f'{i}''个文件')position = search_str(item,'1S')row_heard = global_var_rcol_character = global_var df = read_csv_file(item)#print(df)col_character2 = col_character + 100row_num = len(df)print('发现共'f'{row_num}''行数据,''第'+f'{row_heard}'+'行作为标题行')col_num = df.shape[1]df.iloc[:row_num,col_character] = pd.to_numeric(df.iloc[:row_num,col_character],errors='coerce')df.iloc[:row_num,col_character2] = pd.to_numeric(df.iloc[:row_num,col_character2],errors='coerce')value = df.iloc[0,col_character]value1 = df.iloc[0,col_character2]value2 = df.iloc[1,col_character]value3 = df.iloc[1,col_character2]if value != 0 or value2 != 0:#print('1S列首行数据:' f'{value,value2}''计算平方和')print('正在进行数据分析……')elif value == 0 and value2 == 0:#print('51S列首行数据:' f'{value1,value3}''计算平方和')print('正在进行数据分析……')else:print("请检查数据是否正确")column_data1 = df.iloc[0:row_num,col_character]column_data2 = df.iloc[0:row_num,col_character2]#print(column_data1,' \n ',column_data2)square_sum = sum(map(lambda x: x**2,column_data1))square_sum2 = sum(map(lambda x: x**2,column_data2))if square_sum != 0:square_sums.append(square_sum)elif square_sum == 0:square_sums.append(square_sum2)else:print("计算错误")value_dict = dict(zip(data,square_sums))#print(value_dict)duplicate_values = {}for key, value in value_dict.items():if value in duplicate_values:duplicate_values[value].append(key)else:duplicate_values[value] = [key]duplicate_values = {k:v for k, v in duplicate_values.items() if len(v)>1}if has_duplicateds(value_dict):print("存在重复值")else:print("无重复值,请继续保持")df = pd.DataFrame()wb = Workbook()ws = wb.activews.title = 'Summary'n = 1for key,keys_list in duplicate_values.items():print(f"值{key}对应的文件有:{','.join(keys_list)}")#df[key] = keys_list#df = df.explode(key).reset_index(drop=True)keys = ws.cell(row = n,column = 1).value = keya = n + 1b = len(keys_list)for m in range(len(keys_list)): key_lists = ws.cell(row = a+m,column = 1).value = keys_list[m]n = a + b wb.save('output.xlsx')'''keys = list(duplicate_values.keys())values_lists = list(duplicate_values.values())values_series_list = [pd.Series(values) for values in values_lists]df = pd.concat(values_series_list,keys = keys).reset_index(level=1,drop=True).to_frame('路径')df['values'] = df.indexdf.reset_index(inplace=True)df.drop(columns='index',inplace=True)print(df)'''#df.to_excel('output.xlsx',index=False)
总结
分享:
一个新的生活的开始,是自己给与自己的希望,是自己带给自己的快乐,更是自己原谅过去,面对将来的挑战;