¿Cómo escribir un producto matricial matricial que pueda competir con Eigen?

A continuación, se encuentra la implementación de C ++ que compara el tiempo empleado por Eigen y For Loop para realizar productos de matriz-matriz. El bucle For se ha optimizado para minimizar los errores de caché. El bucle for es inicialmente más rápido que Eigen pero luego se vuelve más lento (hasta un factor de 2 por 500 por 500 matrices). ¿Qué más debo hacer para competir con Eigen? ¿Está bloqueando la razón para el mejor rendimiento de Eigen? Si es así, ¿cómo debo hacer para agregar el locking al bucle for?

#include #include #include int main(int argc, char* argv[]) { srand(time(NULL)); // Input the size of the matrix from the user int N = atoi(argv[1]); int M = N*N; // The matrices stored as row-wise vectors double a[M]; double b[M]; double c[M]; // Initializing Eigen Matrices Eigen::MatrixXd a_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd b_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd c_E(N,N); double CPS = CLOCKS_PER_SEC; clock_t start, end; // Matrix vector product by Eigen start = clock(); c_E = a_E*b_E; end = clock(); std::cout << "\nTime taken by Eigen is: " << (end-start)/CPS << "\n"; // Initializing For loop vectors int count = 0; for (int j=0; j<N; ++j) { for (int k=0; k<N; ++k) { a[count] = a_E(j,k); b[count] = b_E(j,k); ++count; } } // Matrix vector product by For loop start = clock(); count = 0; int count1, count2; for (int j=0; j<N; ++j) { count1 = j*N; for (int k=0; k<N; ++k) { c[count] = a[count1]*b[k]; ++count; } } for (int j=0; j<N; ++j) { count2 = N; for (int l=1; l<N; ++l) { count = j*N; count1 = count+l; for (int k=0; k<N; ++k) { c[count]+=a[count1]*b[count2]; ++count; ++count2; } } } end = clock(); std::cout << "\nTime taken by for-loop is: " << (end-start)/CPS << "\n"; } 

No hay necesidad de confundir cómo se puede lograr una implementación de alto rendimiento del producto matriz-matriz. De hecho, necesitamos que más personas lo conozcan para enfrentar los desafíos futuros de la computación de alto rendimiento. Para entrar en este tema, leer BLIS: un marco para crear rápidamente la funcionalidad BLAS es un buen punto de partida.

Entonces, para desmitificar y responder a la pregunta (Cómo escribir un producto matricial de matriz que puede competir con Eigen) extendí el código publicado por ggael a un total de 400 líneas. Acabo de probarlo en una máquina AVX (Intel (R) Core (TM) i5-3470 CPU @ 3.20GHz). Aquí algunos resultados:

 g++-5.3 -O3 -DNDEBUG -std=c++11 -mavx -m64 -I ../eigen.3.2.8/ gemm.cc -lrt lehn@heim:~/work/test_eigen$ ./a.out 500 Time taken by Eigen is: 0.0190425 Time taken by for-loop is: 0.0121688 lehn@heim:~/work/test_eigen$ ./a.out 1000 Time taken by Eigen is: 0.147991 Time taken by for-loop is: 0.0959097 lehn@heim:~/work/test_eigen$ ./a.out 1500 Time taken by Eigen is: 0.492858 Time taken by for-loop is: 0.322442 lehn@heim:~/work/test_eigen$ ./a.out 5000 Time taken by Eigen is: 18.3666 Time taken by for-loop is: 12.1023 

Si tienes FMA puedes comstackr con

 g++-5.3 -O3 -DNDEBUG -std=c++11 -mfma -m64 -I ../eigen.3.2.8/ -DHAVE_FMA gemm.cc -lrt 

Si también quieres multihilo con openMP también comstack con -fopenmp

Aquí el código completo basado en las ideas del documento BLIS. Es autocontenido, excepto que necesita los archivos de origen completos de Eigen como ggael ya anotó:

 #include #include #include #if defined(_OPENMP) #include  #endif //-- malloc with alignment -------------------------------------------------------- void * malloc_(std::size_t alignment, std::size_t size) { alignment = std::max(alignment, alignof(void *)); size += alignment; void *ptr = std::malloc(size); void *ptr2 = (void *)(((uintptr_t)ptr + alignment) & ~(alignment-1)); void **vp = (void**) ptr2 - 1; *vp = ptr; return ptr2; } void free_(void *ptr) { std::free(*((void**)ptr-1)); } //-- Config -------------------------------------------------------------------- // SIMD-Register width in bits // SSE: 128 // AVX/FMA: 256 // AVX-512: 512 #ifndef SIMD_REGISTER_WIDTH #define SIMD_REGISTER_WIDTH 256 #endif #ifdef HAVE_FMA # ifndef BS_D_MR # define BS_D_MR 4 # endif # ifndef BS_D_NR # define BS_D_NR 12 # endif # ifndef BS_D_MC # define BS_D_MC 256 # endif # ifndef BS_D_KC # define BS_D_KC 512 # endif # ifndef BS_D_NC # define BS_D_NC 4092 # endif #endif #ifndef BS_D_MR #define BS_D_MR 4 #endif #ifndef BS_D_NR #define BS_D_NR 8 #endif #ifndef BS_D_MC #define BS_D_MC 256 #endif #ifndef BS_D_KC #define BS_D_KC 256 #endif #ifndef BS_D_NC #define BS_D_NC 4096 #endif template  struct BlockSize { static constexpr int MC = 64; static constexpr int KC = 64; static constexpr int NC = 256; static constexpr int MR = 8; static constexpr int NR = 8; static constexpr int rwidth = 0; static constexpr int align = alignof(T); static constexpr int vlen = 0; static_assert(MC>0 && KC>0 && NC>0 && MR>0 && NR>0, "Invalid block size."); static_assert(MC % MR == 0, "MC must be a multiple of MR."); static_assert(NC % NR == 0, "NC must be a multiple of NR."); }; template <> struct BlockSize { static constexpr int MC = BS_D_MC; static constexpr int KC = BS_D_KC; static constexpr int NC = BS_D_NC; static constexpr int MR = BS_D_MR; static constexpr int NR = BS_D_NR; static constexpr int rwidth = SIMD_REGISTER_WIDTH; static constexpr int align = rwidth / 8; static constexpr int vlen = rwidth / (8*sizeof(double)); static_assert(MC>0 && KC>0 && NC>0 && MR>0 && NR>0, "Invalid block size."); static_assert(MC % MR == 0, "MC must be a multiple of MR."); static_assert(NC % NR == 0, "NC must be a multiple of NR."); static_assert(rwidth % sizeof(double) == 0, "SIMD register width not sane."); }; //-- aux routines -------------------------------------------------------------- template  void geaxpy(Index m, Index n, const Alpha &alpha, const TX *X, Index incRowX, Index incColX, TY *Y, Index incRowY, Index incColY) { for (Index j=0; j void gescal(Index m, Index n, const Alpha &alpha, TX *X, Index incRowX, Index incColX) { if (alpha!=Alpha(0)) { for (Index j=0; j typename std::enable_if::vlen != 0, void>::type ugemm(Index kc, T alpha, const T *A, const T *B, T beta, T *C, Index incRowC, Index incColC) { typedef T vx __attribute__((vector_size (BlockSize::rwidth/8))); static constexpr Index vlen = BlockSize::vlen; static constexpr Index MR = BlockSize::MR; static constexpr Index NR = BlockSize::NR/vlen; A = (const T*) __builtin_assume_aligned (A, BlockSize::align); B = (const T*) __builtin_assume_aligned (B, BlockSize::align); vx P[MR*NR] = {}; for (Index l=0; l void mgemm(Index mc, Index nc, Index kc, T alpha, const T *A, const T *B, Beta beta, TC *C, Index incRowC, Index incColC) { const Index MR = BlockSize::MR; const Index NR = BlockSize::NR; const Index mp = (mc+MR-1) / MR; const Index np = (nc+NR-1) / NR; const Index mr_ = mc % MR; const Index nr_ = nc % NR; T C_[MR*NR]; #pragma omp parallel for for (Index j=0; j void pack_A(Index mc, Index kc, const TA *A, Index incRowA, Index incColA, T *p) { Index MR = BlockSize::MR; Index mp = (mc+MR-1) / MR; for (Index j=0; j void pack_B(Index kc, Index nc, const TB *B, Index incRowB, Index incColB, T *p) { Index NR = BlockSize::NR; Index np = (nc+NR-1) / NR; for (Index l=0; l void gemm(Index m, Index n, Index k, Alpha alpha, const TA *A, Index incRowA, Index incColA, const TB *B, Index incRowB, Index incColB, Beta beta, TC *C, Index incRowC, Index incColC) { typedef typename std::common_type::type T; const Index MC = BlockSize::MC; const Index NC = BlockSize::NC; const Index MR = BlockSize::MR; const Index NR = BlockSize::NR; const Index KC = BlockSize::KC; const Index mb = (m+MC-1) / MC; const Index nb = (n+NC-1) / NC; const Index kb = (k+KC-1) / KC; const Index mc_ = m % MC; const Index nc_ = n % NC; const Index kc_ = k % KC; T *A_ = (T*) malloc_(BlockSize::align, sizeof(T)*(MC*KC+MR)); T *B_ = (T*) malloc_(BlockSize::align, sizeof(T)*(KC*NC+NR)); if (alpha==Alpha(0) || k==0) { gescal(m, n, beta, C, incRowC, incColC); return; } for (Index j=0; j(1,10000000/N/N/N); Eigen::MatrixXd a_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd b_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd c_E(N,N); Eigen::BenchTimer t1, t2; BENCH(t1, tries, rep, c_E.noalias() = a_E*b_E ); BENCH(t2, tries, rep, myprod(c_E.data(), a_E.data(), b_E.data(), N)); std::cout << "Time taken by Eigen is: " << t1.best() << "\n"; std::cout << "Time taken by for-loop is: " << t2.best() << "\n\n"; } 

Hay dos optimizaciones simples que puedo aconsejar.

1) Vectorizarlo. Sería mejor si lo vectorizaras con ensamblaje en línea o escribas proc proceso de ensamblaje, pero también puedes usar los intrínsecos del comstackdor Incluso puede dejar que el comstackdor vectorice el bucle, pero a veces es difícil escribir el bucle adecuado para ser vectorizado por el comstackdor.

2) Hazlo paralelo. Trate de usar OpenMP.

Tu código ya está bien vectorizado por el comstackdor. La clave para un mayor rendimiento es el locking jerárquico para optimizar el uso de registros y de los diferentes niveles de cachés. El desenrollado de bucle parcial también es crucial para mejorar la canalización de instrucciones. Alcanzar el rendimiento del producto de Eigen requiere mucho esfuerzo y ajuste.

También se debe tener en cuenta que su índice de referencia está ligeramente sesgado y no es completamente confiable. Aquí hay una versión más confiable (necesita las fonts completas de Eigen para obtener bench/BenchTimer.h ):

 #include #include #include void myprod(double *c, const double* a, const double* b, int N) { int count = 0; int count1, count2; for (int j=0; j(1,10000000/N/N/N); Eigen::MatrixXd a_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd b_E = Eigen::MatrixXd::Random(N,N); Eigen::MatrixXd c_E(N,N); Eigen::BenchTimer t1, t2; BENCH(t1, tries, rep, c_E.noalias() = a_E*b_E ); BENCH(t2, tries, rep, myprod(c_E.data(), a_E.data(), b_E.data(), N)); std::cout << "\nTime taken by Eigen is: " << t1.best() << "\n"; std::cout << "\nTime taken by for-loop is: " << t2.best() << "\n"; } 

Comstackndo con 3.3-beta1 y FMA habilitado ( -mfma ), entonces la brecha se vuelve mucho más grande, casi un orden de magnitud para N=2000 .