우노
[Spark] Breeze CSCMatrix Multiply 함수 구현 (메모리 동적 할당) 본문
들어가기 앞서,
- Breeze 가 제공하는 CSCMatrix Multiply 는, 곱셈전에 결과 Matrix 의 NNZ 를 먼저 구합니다.
- 결과 Matrix 의 NNZ 를 구한 이후에는, NNZ 에 따라 결과 Matrix 를 표현하기 위한 배열 공간을 정적으로 할당합니다.
- 해당 포스트에서는, 기본적인 Breeze CSCMatrix Multiply 와 달리,
- 곱셈전에 임의의 배열 공간을 정적으로 할당한 뒤, 곱셈 과정에서 부족한 배열 공간을 동적으로 할당하는 코드를 다뤄보겠습니다.
함수 선언
Multiply 함수
import java.util.Arrays import breeze.linalg.CSCMatrix import java.util.concurrent.TimeUnit.NANOSECONDS def dynamic_multiply(a : breeze.linalg.CSCMatrix[Double], b : breeze.linalg.CSCMatrix[Double]): breeze.linalg.CSCMatrix[Double] = { var tik0 = System.nanoTime() // 결과 행렬의 row 길이와 동일한 임시 데이터 배열을 생성 // 해당 임시 Data 배열은, 각 계산 마다 결과 행렬에 업데이트 됨 val workData = new Array[Double](a.rows) // workData 배열의 업데이트 여부를 확인하기 위한, 임시 Index 배열 생성 val workIndex = new Array[Double](a.rows) // workIndex 배열요소를 -1로 초기화 Arrays.fill(workIndex, -1) // 왼쪽 행렬의 rowIndices val aRows = a.rowIndices // 왼쪽 행렬의 Data val aData = a.data // 왼쪽 행렬의 colPtrs val aPtrs = a.colPtrs // 결과 행렬의 nnz 를 임의로 지정 var maxNnz = (a.rows * b.cols * 0.00001).toInt // 임의로 지정한 결과 행렬의 nnz 만큼, 데이터 저장 배열 생성 var res_colPtrs = new Array[Int](b.cols+1) var res_rowIndices = new Array[Int](maxNnz) var res_values = new Array[Double](maxNnz) var nnz = 0 // col = 우측 행렬의 col index 순서대로 진행 for ( col <- 0 until b.cols) { // bOff = 우측 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, B rowIndices 배열의 index 제공 for (bOff <- b.colPtrs(col) until b.colPtrs(col + 1)) { // bRow = 우측 행렬의 각 col에 있는 요소의 row index // bVal = 우측 행렬의 각 col에 있는 요소값 val bRow = b.rowIndices(bOff) val bVal = b.data(bOff) // a0ff = 우측 행렬의 각 col 에 있는 요소의 bRow 와 동일한 index 를 가지는 좌측 행렬의 col 을 확인하며, // 좌측 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, A rowIndices 배열의 index 제공 for (aOff <- aPtrs(bRow) until aPtrs(bRow + 1)) { // aRow = 좌측 행렬의 각 col에 있는 요소의 row index // aVal = 좌측 행렬의 각 col에 있는 요소값 val aRow = aRows(aOff) val aVal = aData(aOff) // 반복문 시, 우측 행렬의 col index 는 고정된 상태에서 좌측 행렬의 col 이 움직이며 요소를 확인하는데, // 동일한 우측 행렬 col index 기준으로, 좌측 행렬 col 의 내부 요소가 새로운 row 에서 참조될때만, 조건문 실행 if (workIndex(aRow) < col) { // 임시 Data 배열의 aRow index 값을 0 으로 초기화 workData(aRow) = 0 // 임시 Index 배열의 aRow index 값을 col index 로 할당 workIndex(aRow) = col // nnz 가 현재 배열의 최대 길이라면, if(nnz == maxNnz){ // 배열의 길이를 2배로 지정 maxNnz *= 2 var tmp_rowIndices = new Array[Int](maxNnz) var tmp_values = new Array[Double](maxNnz) // 새로운 배열로 값 복사 for(idx <- 0 until maxNnz/2){ tmp_rowIndices(idx) = res_rowIndices(idx) tmp_values(idx) = res_values(idx) } // 재할당 res_rowIndices = tmp_rowIndices res_values = tmp_values } // 결과 행렬의 rowIndices 배열에서 nnz index 값을 aRow 로 할당 (먼저 계산되는 aRow 순서대로 할당 되며, 정렬은 아래에서 진행) res_rowIndices(nnz) = aRow // nnz 증가 nnz += 1 } // workData(aRow) += (좌측 행렬의 각 col에 있는 요소 값) * (우측 행렬의 각 col에 있는 요소 값) workData(aRow) += aVal * bVal } } // 임시 Data 배열이 다 채워지면, 결과 행렬의 colPtrs 배열 요소를 nnz 로 할당 res_colPtrs(col + 1) = nnz // 결과 행렬의 rowIndices 배열에서 현재 계산한 rowIndices 부분만 오름차순으로 정렬 Arrays.sort(res_rowIndices, res_colPtrs(col), res_colPtrs(col + 1)) // resOff = 결과 행렬의 각 col 에 있는 요소의 row index 를 알 수 있도록, Result rowIndices 배열의 index 제공 for (resOff <- res_colPtrs(col) until res_colPtrs(col + 1)) { // row = 결과 행렬의 각 col에 있는 요소의 row index val row = res_rowIndices(resOff) // 임시 데이터 배열의 row에 위치한 값을, 결과 행렬의 데이터 배열에서 해당하는 위치에 할당 res_values(resOff) = workData(row) } } val res = new CSCMatrix(res_values, a.rows, b.cols, res_colPtrs, res_rowIndices) return res }
함수 실행
// 라이브러리 호출
import org.apache.spark.ml.linalg.SparseMatrix
import breeze.linalg.CSCMatrix
import java.util.Random
//랜덤 설정
val rand = new Random()
// CSCMatrix 생성
val LR = 10000
val LC = 30000
val RC = 10000
val LD = 0.001
val RD = 0.001
val l_sm = SparseMatrix.sprand(LR, LC, LD, rand)
val r_sm = SparseMatrix.sprand(LC, RC, RD, rand)
val l_csc = new CSCMatrix(l_sm.values, l_sm.numRows, l_sm.numCols, l_sm.colPtrs, l_sm.rowIndices)
val r_csc = new CSCMatrix(r_sm.values, r_sm.numRows, r_sm.numCols, r_sm.colPtrs, r_sm.rowIndices)
// 함수 실행
dynamic_multiply(l_csc, r_csc)
'Data > Spark' 카테고리의 다른 글
[Spark] Breeze CSCMatrix Multiply 함수 구현 (메모리 정적 할당) (0) | 2022.03.17 |
---|---|
[Spark] ML 패키지와 MLlib 패키지의 차이 (0) | 2021.11.02 |
[Spark] Spark BlockMatrix Multiply 방법 (0) | 2021.08.31 |
[Spark] sc.textFile minPartitions 할당 (0) | 2021.08.26 |
[Spark] Yarn log 확인 (0) | 2021.08.24 |
Comments