C&C++   发布时间:2022-04-03  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了c – N体算法:为什么这种并行速度较慢?大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我整理了一个模拟我正在处理的数据结构类型的示例程序.即我有n个对象,我需要在每个可能的对之间迭代一次并执行(对称)计算.此操作涉及将数据写入两对.在串行中,这将采用这样的循环形式
for(int i = 0; i < N-1; ++i)
   for(int j = i + 1; j < N; ++j)
      ...

然而,它并没有在互联网上寻找一个“缓存无关的并行实现”,我在下面写下并转载.我在这里链接一个帖子(使用英特尔TBB),详细描述了这个算法.

https://software.intel.com/en-us/blogs/2010/07/01/n-bodies-a-parallel-tbb-solution-parallel-code-balanced-recursive-parallelism-with-parallel_invoke

我尝试使用OpenMP任务来执行相同的操作,并且它总是比串行对应物运行得慢(只是在没有-fopenmp的情况下编译).我用g -Wall -std = c 11 -O3 test.cpp -o test编译它.在有或没有-O3的情况下观察到相同的情况;串口总是更快.

为了添加更多信息,在我的实际应用程序中,通常有几百到几千个元素(下面的例子中的变量n),我需要多次以这种成对的方式循环.数百万次.我下面的尝试试图模拟(然我只尝试循环10-100k次).

我使用时间./test非常粗略地定时,因为它有很大的不同.是的,我知道我的例子编写得很糟糕,并且我在我的例子中包含了创建向量所需的时间.但串行的时间给了我大约30秒和一分钟的时间,所以我认为我还不需要做更严格的事情.

我的问题是:为什么序列会做得更好?我在OpenMP中做错了吗?如何正确纠正我错误?我误用了任务吗?我有一种感觉,递归任务与它有关,我尝试将’OMP_THREAD_LIMIT’设置为4,但它没有产生实质性的区别.有没有更好的方法使用OpenMP实现这一点?

注意:我的问题是具体询问如何修复此特定实现,以便它可以并行正常工作.然如果有人知道这个问题的替代解决方案及其在OpenMP中的正确实现,我也对此持开放态度.

提前致谢.

#include <vector>
#include <iostream>

std::vector<std::vector<double>> testme;

void rect(int i0,int i1,int j0,int j1)
{
    int di = i1 - j0;
    int dj = j1 - j0;
    constexpr int threshold = 16;
    if(di > threshold && dj > threshold)
    {
        int im = i0 + di/2;
        int jm = j0 + dj/2;
        #pragma omp task
        { rect(i0,im,j0,jm); }
        rect(im,i1,jm,j1);
        #pragma omp taskwait

        #pragma omp task 
        { rect(i0,j1); }
        rect(im,jm);
        #pragma omp taskwait
    }
    else
    {
        for(int i = i0; i < i1; ++i)
            for(int j = j0; j < j1; ++j) {
                testme[i][j] = 1;
                testme[j][i] = 1;
            }

    }
}

void triangle(int n0,int n1)
{
        int dn = n1 - n0;
        if(dn > 1)
        {
            int nR_328_11845@ = n0 + dn/2;
            #pragma omp task
            { triangle(n0,nm); }
            triangle(nm,n1);
            #pragma omp taskwait

            rect(n0,nm,n1);
        }
}


void calc_force(int nbodies)
{
    #pragma omp parallel num_threads(4)
    #pragma omp single
    triangle(0,nbodies);
}

int main()
{
    int n = 400;
    for(int i = 0; i < n; ++i)
    {
        std::vector<double> temp;
        for(int j = 0; j < n; ++j)
            temp.push_BACk(0);

        testme.push_BACk(temp);
    }

    // Repeat a bunch of times.
    for(int i = 0; i < 100000; ++i)
        calc_force(n);

    return 0;
}

解决方法

@H_944_25@ 对这种工作负载使用递归算法的简单想法对我来说已经很奇怪了.然后,使用OpenMP任务并行化它似乎更奇怪……为什么不用更传统的方法解决问题呢?

所以我决定试一试我想到的一些方法.但是为了使练习变得合理,重要的是为计算“对称计算”做了一些实际的工作,否则,只是迭代所有元素而不虑对称方面肯定是最好的选择.

所以我写了一个force()函数,根据坐标计算与两个物体之间的引力相互作用松散相关的东西.
然后我测试了4种不同的方法来迭代粒子:

>一种天真的三角形方法,例如你提出的方法.由于它是固有的负载不平衡方面,因此它与schedule(auto)子句并行化,以允许运行时库采取它认为最佳性能的决策.
>三角域的“巧妙”遍历,包括在j方向上将其切成两半以允许使用2个常规循环.基本上,它对应于这样的事情:

. / |
/ | __ __
/ | => | // |
/ ___ | | // ____ |
>直接的矩形方法,只是忽略了对称性.注意,这个,就像你的递归方法一样,保证对force数组的非并发访问.
>一种线性化方法,包括预先计算用于访问三角域的i和j索引的顺序,以及迭代包含这些索引的向量.

由于力以[i] = fij积累力的向量; force [j] – = fij;方法将为非并行化索引中的更新生成竞争条件(j,例如在方法#1中),我创建了一个本地预线程强制数组,在进入并行区域时初始化为0.然后在该“私有”阵列上预先进行计算,并且在并行区域退出时,使用关键构造在全局力阵列上累积各个贡献.这是OpenMP中数组的典型缩减模式.

这是完整的代码

#include <iostream>
#include <vector>
#include <cstdlib>
#include <cmath>
#include <omp.h>

std::vector< std::vector<double> > l_f;
std::vector<double> x,y,f;
std::vector<int> vi,vj;

int numth,tid;
#pragma omp threadprivate( tid )

double force( int i,int j ) {
    double dx = x[i] - x[j];
    double dy = Y[i] - Y[j];
    double dist2 = dx*dx + dy*dy;
    return dist2 * std::sqrt( dist2 );
}

void loop1( int n ) {
    #pragma omp parallel
    {
        for ( int i = 0; i < n; i++ ) {
            l_f[tid][i] = 0;
        }
        #pragma omp for schedule(auto) Nowait
        for ( int i = 0; i < n-1; i++ ) {
            for ( int j = i+1; j < n; j++ ) {
                double fij = force( i,j );
                l_f[tid][i] += fij;
                l_f[tid][j] -= fij;
            }
        }
        #pragma omp critical
        for ( int i = 0; i < n; i++ ) {
            f[i] += l_f[tid][i];
        }
    }
}

void loop2( int n ) {
    int m = n/2-1+n%2;
    #pragma omp parallel
    {
        for ( int i = 0; i < n; i++ ) {
            l_f[tid][i] = 0;
        }
        #pragma omp for schedule(statiC) Nowait
        for ( int i = 0; i < n; i++ ) { 
            for ( int j = 0; j < m; j++ ) {
                int ii,jj;
                if ( j < i ) {
                    ii = n-1-i;
                    jj = n-1-j;
                }
                else {
                    ii = i;
                    jj = j+1;
                }
                double fij = force( ii,jj );
                l_f[tid][ii] += fij;
                l_f[tid][jj] -= fij;
            }
        }
        if ( n%2 == 0 ) {
            #pragma omp for schedule(statiC) Nowait
            for ( int i = 0; i < n/2; i++ ) {
                double fij = force( i,n/2 );
                l_f[tid][i] += fij;
                l_f[tid][n/2] -= fij;
            }
        }
        #pragma omp critical
        for ( int i = 0; i < n; i++ ) {
            f[i] += l_f[tid][i];
        }
    }
}

void loop3( int n ) {
    #pragma omp parallel for schedule(statiC)
    for ( int i = 0; i < n; i++ ) {
        for ( int j = 0; j < n; j++ ) {
            if ( i < j ) {
                f[i] += force( i,j );
            }
            else if ( i > j ) {
                f[i] -= force( i,j );
            }
        }
    }
}

void loop4( int n ) {
    #pragma omp parallel
    {
        for ( int i = 0; i < n; i++ ) {
            l_f[tid][i] = 0;
        }
        #pragma omp for schedule(statiC) Nowait
        for ( int k = 0; k < vi.size(); k++ ) {
            int i = vI[k];
            int j = vj[k];
            double fij = force( i,j );
            l_f[tid][i] += fij;
            l_f[tid][j] -= fij;
        }
        #pragma omp critical
        for ( int i = 0; i < n; i++ ) {
            f[i] += l_f[tid][i];
        }
    }
}

int main( int argc,char *argv[] ) {
    if ( argc != 2 ) {
        std::cout << "need the dim as argument\n";
        return 1;
    }
    int n = std::atoi( argv[1] );

    // initialise data
    f.resize( n );
    x.resize( n );
    y.resize( n );
    for ( int i = 0; i < n; ++i ) {
        x[i] = Y[i] = i;
        f[i] = 0;
    }

    // initialising linearised index vectors
    for ( int i = 0; i < n-1; i++ ) {
        for ( int j = i+1; j < n; j++ ) {
            vi.push_BACk( i );
            vj.push_BACk( j );
        }
    }
    // initialising the local forces vectors
    #pragma omp parallel
    {
        tid = omp_get_thread_num();
        #pragma master
        numth = omp_get_num_threads();
    }
    l_f.resize( numth );
    for ( int i = 0; i < numth; i++ ) {
        l_f[i].resize( n );
    }

    // tesTing all methods one after the other,with a warm up before each
    int lmax = 10000;
    loop1( n );
    double tbeg = omp_get_wtime();
    for ( int l = 0; l < lmax; L++ ) {
        loop1( n );
    }
    double tend = omp_get_wtime();
    std::cout << "Time for triangular loop is " << tend-tbeg << "s\n";

    loop2( n );
    tbeg = omp_get_wtime();
    for ( int l = 0; l < lmax; L++ ) {
        loop2( n );
    }
    tend = omp_get_wtime();
    std::cout << "Time for mangled rectangular loop is " << tend-tbeg << "s\n";

    loop3( n );
    tbeg = omp_get_wtime();
    for ( int l = 0; l < lmax; L++ ) {
        loop3( n );
    }
    tend = omp_get_wtime();
    std::cout << "Time for naive rectangular loop is " << tend-tbeg << "s\n";

    loop4( n );
    tbeg = omp_get_wtime();
    for ( int l = 0; l < lmax; L++ ) {
        loop4( n );
    }
    tend = omp_get_wtime();
    std::cout << "Time for linearised loop is " << tend-tbeg << "s\n";

    int ret = f[n-1];
    return ret;
}

现在,评估相对性能和可伸缩性变得简单.
在第一次非定时预热迭代之后,所有方法都在循环上定时.

编译:g -O3 -mtune = native -march = native -fopenmp tbf.cc -o tbf

8核IvyBridge cpu上的结果:

> OMP_NUM_THREADS=1 numactl -N 0 -m 0 ./tbf 500
Time for triangular loop is 9.21198s
Time for mangled rectangular loop is 10.1316s
Time for naive rectangular loop is 15.9408s
Time for linearised loop is 10.6449s
> OMP_NUM_THREADS=2 numactl -N 0 -m 0 ./tbf 500
Time for triangular loop is 6.84671s
Time for mangled rectangular loop is 5.13731s
Time for naive rectangular loop is 8.09542s
Time for linearised loop is 5.4654s
> OMP_NUM_THREADS=4 numactl -N 0 -m 0 ./tbf 500
Time for triangular loop is 4.03016s
Time for mangled rectangular loop is 2.90809s
Time for naive rectangular loop is 4.45373s
Time for linearised loop is 2.7733s
> OMP_NUM_THREADS=8 numactl -N 0 -m 0 ./tbf 500
Time for triangular loop is 2.31051s
Time for mangled rectangular loop is 2.05854s
Time for naive rectangular loop is 3.03463s
Time for linearised loop is 1.7106s

因此,在这种情况下,方法#4似乎是具有良好性能和非常好的可伸缩性的最佳选择.请注意,由于schedule(auto)指令中的良好负载平衡作业,直接的三角形方法并不算太糟糕.但最终,我鼓励你用你自己的工作量进行测试……

作为参,您的初始代码(为计算force()而修改的方式与其他测试完全相同,包括使用的OpenMP线程的数量,但不需要本地强制数组和最终减少,坦克到递归方法)给出这个:

> OMP_NUM_THREADS=1 numactl -N 0 -m 0 ./recursive 500
Time for recursive method is 9.32888s
> OMP_NUM_THREADS=2 numactl -N 0 -m 0 ./recursive 500
Time for recursive method is 9.48718s
> OMP_NUM_THREADS=4 numactl -N 0 -m 0 ./recursive 500
Time for recursive method is 10.962s
> OMP_NUM_THREADS=8 numactl -N 0 -m 0 ./recursive 500
Time for recursive method is 13.2786

大佬总结

以上是大佬教程为你收集整理的c – N体算法:为什么这种并行速度较慢?全部内容,希望文章能够帮你解决c – N体算法:为什么这种并行速度较慢?所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。