오늘의 인기 글
최근 글
최근 댓글
Today
Total
12-30 13:56
관리 메뉴

우노

[Spark] Breeze CSCMatrix Multiply 함수 구현 (메모리 동적 할당) 본문

Data/Spark

[Spark] Breeze CSCMatrix Multiply 함수 구현 (메모리 동적 할당)

운호(Noah) 2022. 3. 17. 16:25

들어가기 앞서,

  • Breeze 가 제공하는 CSCMatrix Multiply 는, 곱셈전에 결과 Matrix 의 NNZ 를 먼저 구합니다.
  • 결과 Matrix 의 NNZ 를 구한 이후에는, NNZ 에 따라 결과 Matrix 를 표현하기 위한 배열 공간을 정적으로 할당합니다.
  • 해당 포스트에서는, 기본적인 Breeze CSCMatrix Multiply 와 달리,
  • 곱셈전에 임의의 배열 공간을 정적으로 할당한 뒤, 곱셈 과정에서 부족한 배열 공간을 동적으로 할당하는 코드를 다뤄보겠습니다.

함수 선언

  • Multiply 함수

      import java.util.Arrays
      import breeze.linalg.CSCMatrix
      import java.util.concurrent.TimeUnit.NANOSECONDS
    
      def dynamic_multiply(a : breeze.linalg.CSCMatrix[Double], b : breeze.linalg.CSCMatrix[Double]): breeze.linalg.CSCMatrix[Double] = {
    
          var tik0 = System.nanoTime()
    
          // 결과 행렬의 row 길이와 동일한 임시 데이터 배열을 생성
          // 해당 임시 Data 배열은, 각 계산 마다 결과 행렬에 업데이트 됨
          val workData = new Array[Double](a.rows)
    
          // workData 배열의 업데이트 여부를 확인하기 위한, 임시 Index 배열 생성
          val workIndex = new Array[Double](a.rows)
    
          // workIndex 배열요소를 -1로 초기화
          Arrays.fill(workIndex, -1)
    
          // 왼쪽 행렬의 rowIndices
          val aRows = a.rowIndices
          // 왼쪽 행렬의 Data
          val aData = a.data
          // 왼쪽 행렬의 colPtrs
          val aPtrs = a.colPtrs
    
          // 결과 행렬의 nnz 를 임의로 지정
          var maxNnz = (a.rows * b.cols * 0.00001).toInt
    
          // 임의로 지정한 결과 행렬의 nnz 만큼, 데이터 저장 배열 생성
          var res_colPtrs = new Array[Int](b.cols+1)  
          var res_rowIndices = new Array[Int](maxNnz)  
          var res_values = new Array[Double](maxNnz)
    
          var nnz = 0
    
          // col = 우측 행렬의 col index 순서대로 진행
          for ( col <- 0 until b.cols) {
    
              // bOff = 우측 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, B rowIndices 배열의 index 제공
              for (bOff <- b.colPtrs(col) until b.colPtrs(col + 1)) {
    
                  // bRow = 우측 행렬의 각 col에 있는 요소의 row index
                  // bVal = 우측 행렬의 각 col에 있는 요소값
                  val bRow = b.rowIndices(bOff)
                  val bVal = b.data(bOff)
    
                  // a0ff = 우측 행렬의 각 col 에 있는 요소의 bRow 와 동일한 index 를 가지는 좌측 행렬의 col 을 확인하며,
                  // 좌측 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, A rowIndices 배열의 index 제공
                  for (aOff <- aPtrs(bRow) until aPtrs(bRow + 1)) {
    
                      // aRow = 좌측 행렬의 각 col에 있는 요소의 row index  
                      // aVal = 좌측 행렬의 각 col에 있는 요소값
                      val aRow = aRows(aOff)
                      val aVal = aData(aOff)
    
                      // 반복문 시, 우측 행렬의 col index 는 고정된 상태에서 좌측 행렬의 col 이 움직이며 요소를 확인하는데,
                      // 동일한 우측 행렬 col index 기준으로, 좌측 행렬 col 의 내부 요소가 새로운 row 에서 참조될때만, 조건문 실행
                      if (workIndex(aRow) < col) {
    
                          // 임시 Data 배열의 aRow index 값을 0 으로 초기화
                          workData(aRow) = 0
                          // 임시 Index 배열의 aRow index 값을 col index 로 할당
                          workIndex(aRow) = col
    
                          // nnz 가 현재 배열의 최대 길이라면,
                          if(nnz == maxNnz){
                              // 배열의 길이를 2배로 지정
                              maxNnz *= 2
                              var tmp_rowIndices = new Array[Int](maxNnz)
                              var tmp_values = new Array[Double](maxNnz)
                              // 새로운 배열로 값 복사
                              for(idx <- 0 until maxNnz/2){
                                 tmp_rowIndices(idx) = res_rowIndices(idx)
                                 tmp_values(idx) = res_values(idx)
                              }
                              // 재할당
                                res_rowIndices = tmp_rowIndices
                                res_values = tmp_values
                            }
    
                          // 결과 행렬의 rowIndices 배열에서 nnz index 값을 aRow 로 할당 (먼저 계산되는 aRow 순서대로 할당 되며, 정렬은 아래에서 진행)
                          res_rowIndices(nnz) = aRow
                          // nnz 증가
                          nnz += 1
                      }    
                      // workData(aRow) += (좌측 행렬의 각 col에 있는 요소 값) * (우측 행렬의 각 col에 있는 요소 값)
                      workData(aRow) += aVal * bVal
                  }
              }
    
              // 임시 Data 배열이 다 채워지면, 결과 행렬의 colPtrs 배열 요소를 nnz 로 할당
              res_colPtrs(col + 1) = nnz
    
              // 결과 행렬의 rowIndices 배열에서 현재 계산한 rowIndices 부분만 오름차순으로 정렬
              Arrays.sort(res_rowIndices, res_colPtrs(col), res_colPtrs(col + 1))
    
                // resOff = 결과 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, Result rowIndices 배열의 index 제공
                for (resOff <- res_colPtrs(col) until res_colPtrs(col + 1)) {
    
                    // row = 결과 행렬의 각 col에 있는 요소의 row index  
                    val row = res_rowIndices(resOff)
                    // 임시 데이터 배열의 row에 위치한 값을, 결과 행렬의 데이터 배열에서 해당하는 위치에 할당
                    res_values(resOff) = workData(row)
                }
        }
    
          val res = new CSCMatrix(res_values, a.rows, b.cols, res_colPtrs, res_rowIndices)
    
          return res
      }

함수 실행

// 라이브러리 호출
import org.apache.spark.ml.linalg.SparseMatrix
import breeze.linalg.CSCMatrix
import java.util.Random

//랜덤 설정
val rand = new Random()

// CSCMatrix 생성
val LR = 10000
val LC = 30000
val RC = 10000
val LD = 0.001
val RD = 0.001
val l_sm = SparseMatrix.sprand(LR, LC, LD, rand)
val r_sm = SparseMatrix.sprand(LC, RC, RD, rand)
val l_csc = new CSCMatrix(l_sm.values, l_sm.numRows, l_sm.numCols, l_sm.colPtrs, l_sm.rowIndices)
val r_csc = new CSCMatrix(r_sm.values, r_sm.numRows, r_sm.numCols, r_sm.colPtrs, r_sm.rowIndices)

// 함수 실행
dynamic_multiply(l_csc, r_csc)
Comments