by 甘晴void
1 题目重述
Consider a sparse matrix stored in the compressed row format (you may find a description of this format on the web or anysuitable text on sparse linear algebra). Write an OpenMP program for computing the product of this matrix with a vector. Download sample matrices from the Matrix Market (http://math.nist.gov/MatrixMarket/) and test the performance of your implementation as a function of matrix size and number of threads.
2 思路分析
2.1 问题建模
2.2 CSR格式
typedef struct
{int n_rows; // 矩阵的行数int n_cols; // 矩阵的列数int n_nonzero; // 非零元素的个数int *row_ptr; // 行指针数组int *col_ind; // 列索引数组double *val; // 非零元素值数组
} crs_matrix_t;
- val数组:按行开始数,依次存储每一行的非零元素;
- col_ind数组:每一个非零元素的列索引值;
- row_ptr数组:每一行的第一个非零元素在data数组中的索引值(注意:只统计每一行第一个非零元);
1 0 2 0
0 0 3 0
0 0 0 0
0 5 6 0
0 0 0 4
val = [1,2,3,5,6,4]
col_ind = [0,2,2,1,2,3]
row_ptr = [0,2,3,3,5,6]
2.3 CSR格式矩阵存储
// 读取非零元素的行索引、列索引和值int row = 0, idx = 0;A.row_ptr[0] = 0;for (int i = 0; i < n_nonzero; i++){int r, c;double v;fscanf(f, "%d %d %le", &r, &c, &v);A.col_ind[i] = c - 1; // 存储列索引(减1)A.val[i] = v; // 存储值while (r > row + 1){A.row_ptr[row + 1] = i;row++;}row = r - 1;}while (row < n_rows){A.row_ptr[row + 1] = n_nonzero;row++;}
2.4 CSR格式访问
到A->row_ptr[i + 1]
for (int i = 0; i < rows; i++){double *temp_row = new double[A->n_cols]; // 临时数组,逐行输出for (int j = 0; j < cols; j++)temp_row[j] = 0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++) // j为该数在data中的位置{if (A->col_ind[j] < cols) // A->col_ind[j]为该数的列号,i为该数的行号{temp_row[A->col_ind[j]] = A->val[j];}}for (int j = 0; j < cols; j++)printf("%20.8f", temp_row[j]);printf("\n");}
%%MatrixMarket matrix coordinate real general
3140 3140 543162
1 1 6.9033670000000e-01
2 1 1.6855795000000e-03
3 1 2.3404025000000e-03
7 1 6.3543841000000e-03
2.5 CSR格式运算
for (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;}
2.6 性能衡量
优化加速比= 串行时间 / 并行时间
3 代码实现与结果分析
3.0 测试环境
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 48 bits physical, 48 bits virtual
CPU(s): 128
On-line CPU(s) list: 0-127
Thread(s) per core: 2
Frequency boost: enabled
CPU MHz: 1500.000
CPU max MHz: 2800.0000
CPU min MHz: 1500.0000
BogoMIPS: 5589.07
Virtualization: AMD-V
L1d cache: 2 MiB
L1i cache: 2 MiB
L2 cache: 32 MiB
L3 cache: 512 MiB
NUMA node0 CPU(s): 0-31,64-95
NUMA node1 CPU(s): 32-63,96-127
我们可以从 矩阵市场 获得更多矩阵。
- fs_760_3.mtx(760 x 760, 5976 entries,非零率1.0%)
- psmigr_3.mtx(3140 x 3140,543162 entries,非零率5.5%)
- fidapm37.mtx(9152 x 9152, 765944 entries,非零率0.91%)
3.1 串行实现
// 矩阵向量乘法(串行)
void crs_matrix_vector_product_serial(const crs_matrix_t *A, double *x, double *y)
{for (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;}
3.2 并行实现
- 向量化:利用SIMD(单指令多数据)指令同时对多个数据元素执行操作。这可以通过确保数据对齐和使用编译器内联函数或启用编译器自动向量化(例如,对于Intel CPU使用
-O3 -mavx
)来实现。【这里不太好实现】 - 私有变量:在OpenMP中,可以将循环变量声明为每个线程的私有变量,以避免假共享。这可以通过在
#pragma omp parallel for
子句来完成。【这个后面会探究】 - 归约:由于每个线程计算一个部分和,可以使用OpenMP的
子句来安全地合并线程的和,减少同步开销。【这个需要做内层并行时考虑】 - 线程亲和性:将线程绑定到特定的CPU核心可以减少缓存冲突并提高性能。这可以通过使用
来实现。【这个提升不是很大】 - 分块策略:如果矩阵非常大,可以考虑使用分块算法,如分块雅可比方法或块循环数据分布,来跨多个线程或进程并行化计算。【稀疏矩阵与此不同】
- 负载均衡:如果矩阵的稀疏性不均匀,某些行可能有远多于其他行的非零元素。可以使用OpenMP的动态调度(
)来平衡线程间的工作负载。【这个想法看起来能work,值得试试】 - 缓存优化:优化数据访问模式以利用缓存局部性。例如,预取数据元素或重新排序循环以减少缓存缺失。【这个似乎比较难实现】
- 混合并行:在集群或多节点系统上,结合OpenMP和MPI(消息传递接口)进行分布式内存并行。【MPI不在讨论范围内】
// 矩阵向量乘法(并行化外层循环)
void crs_matrix_vector_product_parallel(const crs_matrix_t *A, double *x, double *y)
#pragma omp parallel forfor (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;// cout << "i= " << i << " " << omp_get_thread_num() << " ";// if (i % 10 == 0) cout << endl;}
#pragma omp parallel for schedule(dynamic)
#pragma omp parallel for schedule(dynamic)for (int i = 0; i < A->n_rows; i++){double sum = 0.0;
#pragma omp parallel for reduction(+ : sum)for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;}
- fs_760_3.mtx(760 x 760, 5976 entries,非零率1.0%)
- psmigr_3.mtx(3140 x 3140,543162 entries,非零率5.5%)
- fidapm37.mtx(9152 x 9152, 765944 entries,非零率0.91%)
4 实验总结
5 代码
5.1 预处理
5 4 6
1 1 1
4 2 5
1 3 2
2 3 3
4 3 6
5 4 4%%行优先矩阵
5 4 6
1 1 1
1 3 2
2 3 3
4 2 5
4 3 6
5 4 4
#include "fstream"
#include <iomanip>
#include <iostream>
#include <queue>
#include <string>
using namespace std;struct node
{int row;int col;double value;node(int r, int c, double v){this->row = r;this->col = c;this->value = v;}
};struct cmp
{bool operator()(node a, node b){if (a.row > b.row)return 1;else if (a.row == b.row && a.col > b.col)return 1;return 0;}
};void process(string filename)
{string file_e = filename + ".mtx";const char *file = file_e.c_str();FILE *f = fopen(file, "r");if (f == NULL){fprintf(stderr, "Error opening file: %s\n", file);exit(1);}// 跳过第一行的说明信息char line[256];if (fgets(line, sizeof(line), f) == NULL){fprintf(stderr, "Error reading file header\n");fclose(f);exit(1);}// 读取矩阵的基本信息int n_rows, n_cols, n_nonzero;if (fscanf(f, "%d %d %d", &n_rows, &n_cols, &n_nonzero) != 3){fprintf(stderr, "Error reading matrix dimensions\n");fclose(f);exit(1);}// cout << n_rows << " " << n_cols << " " << n_nonzero;std::priority_queue<node, std::vector<node>, cmp> q;for (int i = 0; i < n_nonzero; i++){int r, c;double v;if (fscanf(f, "%d %d %le", &r, &c, &v) != 3){fprintf(stderr, "Error reading matrix element\n");fclose(f);exit(1);}node new_node(r, c, v);q.push(new_node);}// 将队列元素复制到vectorstd::vector<node> vec;while (!q.empty()){vec.push_back(q.top());q.pop();}cout << "start to output" << endl;// 输出string outfile = filename + "_line.mtx";std::ofstream outFile(outfile);if (outFile.is_open()){// 将输出写入文件outFile << line;outFile << n_rows << " " << n_cols << " " << n_nonzero << endl;// 反向遍历vector,因为复制到vector中的元素顺序是逆序的for (auto it = vec.begin(); it != vec.end(); ++it){outFile << std::fixed << it->row << " " << it->col << " ";outFile << std::scientific << std::setprecision(13) << it->value << std::endl;}outFile << endl;// 关闭文件outFile.close();std::cout << "Output written to output." << std::endl;}else{std::cerr << "Unable to open output file." << std::endl;return;}fclose(f);return;
}int main(int argc, char *argv[])
{// 读取参数,确定处理目标string target;if (argc < 2){std::cout << "Usage: " << argv[0] << " <matrix name>\n";return 1;}try{target = argv[1];std::cout << "process target : " << target << std::endl;}catch (const std::invalid_argument &e){std::cerr << "Error: " << e.what() << " - Invalid argument: " << argv[1] << std::endl;return 1;}catch (const std::out_of_range &e){std::cerr << "Error: " << e.what() << " - Argument " << argv[1] << " is out of range" << std::endl;return 1;}string filename = target;process(filename);return 0;
5.2 算法
#include "algorithm"
#include "fstream"
#include "iostream"
#include "omp.h"
#include <chrono>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <time.h>
#include <vector>using namespace std;int num_thread = 2;typedef struct
{int n_rows; // 矩阵的行数int n_cols; // 矩阵的列数int n_nonzero; // 非零元素的个数int *row_ptr; // 行指针数组int *col_ind; // 列索引数组double *val; // 非零元素值数组
} crs_matrix_t;// 读入稀疏矩阵
crs_matrix_t read_mtx_file(const char *filename)
{FILE *f = fopen(filename, "r");if (f == NULL){fprintf(stderr, "Error opening file: %s\n", filename);exit(1);}// 跳过第一行的说明信息char line[256];if (fgets(line, sizeof(line), f) == NULL){fprintf(stderr, "Error reading file header\n");fclose(f);exit(1);}// 读取矩阵的基本信息int n_rows, n_cols, n_nonzero;if (fscanf(f, "%d %d %d", &n_rows, &n_cols, &n_nonzero) != 3){fprintf(stderr, "Error reading matrix dimensions\n");fclose(f);exit(1);}// cout << n_rows << " " << n_cols << " " << n_nonzero;// 分配内存crs_matrix_t A;A.n_rows = n_rows;A.n_cols = n_cols;A.n_nonzero = n_nonzero;A.row_ptr = (int *)malloc((n_rows + 1) * sizeof(int));A.col_ind = (int *)malloc(n_nonzero * sizeof(int));A.val = (double *)malloc(n_nonzero * sizeof(double));// 读取非零元素的行索引、列索引和值int row = 0, idx = 0;A.row_ptr[0] = 0;for (int i = 0; i < n_nonzero; i++){int r, c;double v;if (fscanf(f, "%d %d %le", &r, &c, &v) != 3){fprintf(stderr, "Error reading matrix element\n");fclose(f);free(A.row_ptr);free(A.col_ind);free(A.val);exit(1);}A.col_ind[i] = c - 1; // 存储列索引(减1)A.val[i] = v; // 存储值while (r > row + 1){A.row_ptr[row + 1] = i;row++;}row = r - 1;}while (row < n_rows){A.row_ptr[row + 1] = n_nonzero;row++;}fclose(f);return A;
}// 输出稀疏矩阵参数
void print_matrix_par(const crs_matrix_t *A)
{// datacout << " data: ";for (int i = 0; i < A->n_nonzero; i++){cout << A->val[i] << " ";}cout << endl;// colcout << " col index: ";for (int i = 0; i < A->n_nonzero; i++){cout << A->col_ind[i] << " ";}cout << endl;// rowcout << " row ptr: ";for (int i = 0; i < A->n_rows + 1; i++){cout << A->row_ptr[i] << " ";}cout << endl;
}// 打印部分矩阵
void print_matrix_head(const crs_matrix_t *A, int rows, int cols)
{printf("Printing the first %d x %d elements of the matrix:\n", rows, cols);// 打开一个新的输出文件std::ofstream outFile("output.txt");// 检查文件是否打开成功if (outFile.is_open()){// 将输出写入文件for (int i = 0; i < rows; i++){double *temp_row = new double[A->n_cols]; // 临时数组,逐行输出for (int j = 0; j < cols; j++)temp_row[j] = 0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++) // j为该数在data中的位置{if (A->col_ind[j] < cols) // A->col_ind[j]为该数的列号,i为该数的行号{temp_row[A->col_ind[j]] = A->val[j];}}for (int j = 0; j < cols; j++)printf("%20.8f", temp_row[j]);printf("\n");}// 关闭文件outFile.close();std::cout << "Output written to 'output.txt'." << std::endl;}else{std::cerr << "Unable to open output file." << std::endl;return;}// for (int i = 0; i < rows; i++)// {// double *temp_row = new double[A->n_cols]; // 临时数组,逐行输出// for (int j = 0; j < cols; j++)// temp_row[j] = 0;// for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++) // j为该数在data中的位置// {// if (A->col_ind[j] < cols) // A->col_ind[j]为该数的列号,i为该数的行号// {// temp_row[A->col_ind[j]] = A->val[j];// }// }// for (int j = 0; j < cols; j++)// printf("%20.8f", temp_row[j]);// printf("\n");// }
}// 生成随机向量
double *generate_random_vector(int n, double min, double max)
{double *vec = (double *)malloc(n * sizeof(double));// 初始化随机数种子srand(time(NULL));// cout << "random vec: ";for (int i = 0; i < n; i++){vec[i] = min + (double)rand() / RAND_MAX * (max - min);// cout << vec[i] << " ";}// cout << endl;return vec;
}// 矩阵向量乘法(串行)
void crs_matrix_vector_product_serial(const crs_matrix_t *A, double *x, double *y)
{for (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;}
}// 矩阵向量乘法(并行化外层循环)
void crs_matrix_vector_product_parallel(const crs_matrix_t *A, double *x, double *y)
#pragma omp parallel for schedule(dynamic)for (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;// cout << "i= " << i << " " << omp_get_thread_num() << " ";// if (i % 10 == 0) cout << endl;}
}// 矩阵向量乘法(并行化外层循环+动态调度)
void crs_matrix_vector_product_parallel2(const crs_matrix_t *A, double *x, double *y)
#pragma omp parallel for schedule(dynamic)for (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;// cout << "i= " << i << " " << omp_get_thread_num() << " ";// if (i % 10 == 0) cout << endl;}
}// 矩阵向量乘法(并行化外层循环+动态调度+内层并行)
void crs_matrix_vector_product_parallel3(const crs_matrix_t *A, double *x, double *y)
#pragma omp parallel for schedule(dynamic)for (int i = 0; i < A->n_rows; i++){double sum = 0.0;
#pragma omp parallel for schedule(dynamic) reduction(+ : sum)for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){sum += A->val[j] * x[A->col_ind[j]];}y[i] = sum;}
}// 矩阵向量乘法(私有临时x向量)
void crs_matrix_vector_product_parallel4(const crs_matrix_t *A, double *x, double *y)
#pragma omp parallel{// 声明线程私有的临时 x 向量double private_x[A->n_cols];for (int i = 0; i < A->n_cols; i++){// 将 x 向量拷贝到线程私有的缓存中private_x[i] = x[i];}#pragma omp forfor (int i = 0; i < A->n_rows; i++){double sum = 0.0;for (int j = A->row_ptr[i]; j < A->row_ptr[i + 1]; j++){// 使用线程私有的 private_x 向量进行计算sum += A->val[j] * private_x[A->col_ind[j]];}y[i] = sum;}}
}// 比较函数(比较串并行结果是否一致)
void compare(double *x, double *y, int size, string name)
{bool flag = true;int wrong_num = 0;for (int i = 0; i < size; i++){if (x[i] != y[i]){flag = false;wrong_num++;}}cout << name << " accuracy: " << (double)(size - wrong_num) / (double)size << " ";if (flag == true)cout << "(outcome accurate)" << endl;elsecout << "(outcome wrong)" << endl;
}int main(int argc, char *argv[])
{// 读取参数,确定线程数string matrix_name;if (argc < 3){std::cout << "Usage: " << argv[0] << " <thread_num> <matrix_name>\n";return 1;}try{num_thread = std::stoi(argv[1]);std::cout << "set thread number : " << num_thread << std::endl;matrix_name = argv[2];std::cout << "set matrix name : " << matrix_name << std::endl;}catch (const std::invalid_argument &e){std::cerr << "Error: " << e.what() << " - Invalid argument: " << argv[1] << std::endl;return 1;}catch (const std::out_of_range &e){std::cerr << "Error: " << e.what() << " - Argument " << argv[1] << " is out of range" << std::endl;return 1;}// 检测环境int concurrency = omp_get_num_procs();cout << "system max concurrency = " << concurrency << endl;//const char *filename = "psmigr_3_line.mtx"; // defaultstring file_e = matrix_name + "_line.mtx";filename = file_e.c_str();crs_matrix_t A = read_mtx_file(filename);// print_matrix_par(&A);// print_matrix_head(&A, 5, 4);double *v = generate_random_vector(A.n_cols, 1, 1);double *ans_serial = new double[A.n_cols];double *ans_parallel = new double[A.n_cols];double *ans_parallel2 = new double[A.n_cols];double *ans_parallel3 = new double[A.n_cols];// 串行计算auto start_serial = std::chrono::high_resolution_clock::now();crs_matrix_vector_product_serial(&A, v, ans_serial);auto end_serial = std::chrono::high_resolution_clock::now();// cout << "ans = " << endl;// for (int i = 0; i < A.n_rows; i++)// cout << ans_serial[i] << " ";// cout << endl;// 并行计算(优化外层循环)auto start_parallel = std::chrono::high_resolution_clock::now();crs_matrix_vector_product_parallel(&A, v, ans_parallel);auto end_parallel = std::chrono::high_resolution_clock::now();// 并行计算(优化外层循环+动态调度)auto start_parallel2 = std::chrono::high_resolution_clock::now();crs_matrix_vector_product_parallel2(&A, v, ans_parallel2);auto end_parallel2 = std::chrono::high_resolution_clock::now();// 并行计算(优化外层循环+动态调度+内层并行)auto start_parallel3 = std::chrono::high_resolution_clock::now();crs_matrix_vector_product_parallel3(&A, v, ans_parallel3);auto end_parallel3 = std::chrono::high_resolution_clock::now();// 比较准确性compare(ans_serial, ans_parallel, A.n_cols, "parallel-basic");compare(ans_serial, ans_parallel2, A.n_cols, "parallel-2");compare(ans_serial, ans_parallel3, A.n_cols, "parallel-3");// 比较性能std::chrono::duration<double, std::milli> duration_serial = end_serial - start_serial;std::chrono::duration<double, std::milli> duration_parallel = end_parallel - start_parallel;std::chrono::duration<double, std::milli> duration_parallel2 = end_parallel2 - start_parallel2;std::chrono::duration<double, std::milli> duration_parallel3 = end_parallel3 - start_parallel3;cout << "serial time =" << duration_serial.count() << "ms" << endl<< endl;cout << "parallel time =" << duration_parallel.count() << "ms" << endl;cout << "speedup1 =" << duration_serial.count() / duration_parallel.count() << endl<< endl;cout << "parallel time2=" << duration_parallel2.count() << "ms" << endl;cout << "speedup2 =" << duration_serial.count() / duration_parallel2.count() << endl<< endl;cout << "parallel time3=" << duration_parallel3.count() << "ms" << endl;cout << "speedup3 =" << duration_serial.count() / duration_parallel3.count() << endl;// 释放内存free(A.row_ptr);free(A.col_ind);free(A.val);return 0;
5.3 测试与绘图
import subprocess
import time
import matplotlib.pyplot as plt
import itertools# 每个矩阵测试轮数
test_num = 20
# 测试矩阵名
matrix_name = "psmigr_3"# 定义需要测试的线程数
thread_num1 = range(1,32) # 范围
thread_num2 = [64,96,112,128,192,256] # 离散点
thread_numbers = sorted(list(itertools.chain(thread_num1, thread_num2)))
print(thread_numbers)# 记录每个线程数的平均speedup
speedups = []
speedups2 = []
speedups3 = []for thread_num in thread_numbers:total_speedup = 0total_speedup2 = 0total_speedup3 = 0for _ in range(test_num):# 执行可执行文件并记录时间result = subprocess.run(['./mv', str(thread_num), matrix_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)# 解析输出结果lines = result.stdout.strip().split('\n')speedup = 0speedup2 = 0speedup3 = 0for line in lines:if line.startswith('speedup1'):speedup = float(line.split('=')[1][:-1])if line.startswith('speedup2'):speedup2 = float(line.split('=')[1][:-1])if line.startswith('speedup3'):speedup3 = float(line.split('=')[1][:-1])total_speedup += speeduptotal_speedup2 += speedup2total_speedup3 += speedup3print("thread_num {} complete".format(thread_num))# 计算平均speedupavg_speedup = total_speedup / test_numspeedups.append(avg_speedup)avg_speedup2 = total_speedup2 / test_numspeedups2.append(avg_speedup2)avg_speedup3 = total_speedup3 / test_numspeedups3.append(avg_speedup3)print(speedups)
# 绘制参数-speedup图
plt.figure(figsize=(10, 6))
plt.plot(thread_numbers, speedups, label="basic", color="blue")
plt.plot(thread_numbers, speedups2, label="schedule", color="red")
plt.plot(thread_numbers, speedups3, label="inner", color="green")
plt.xlabel('Thread Number')
plt.ylabel('Average Speedup')
plt.title('Thread Number vs. Average Speedup')
5.4 Linux操作命令
# (1)从矩阵市场获取矩阵
wget https://math.nist.gov/pub/MatrixMarket2/misc/qcd/conf5.0-00l4x4-1400.mtx.gz
# (2)解压矩阵
gzip -d psmigr_1.mtx.gz
# (3)编译执行order,并输入matrix_name,不要包括.mtx。将生成<matrix_name>_line.mtx
# e.g. ./order psmigr_3
g++ order.cpp -o order
./order <matrix_name>
# (4)编译执行mv,并输入并行数与matrix_name,不要包括.mtx,将输出结果。
# e.g. ./mv 64 psmigr_3
g++ -fopenmp mv.cpp -o mv
./mv <thread_num> <matrix_name>
# (5)【可选】修改python代码,选择想要绘图的并行数与矩阵名
# (6)运行python,绘制图像
python3 test.py