우노
[Spark] Matrix의 Row를 전체 Vertex 개수로 두고 Col 기준으로 나누기 ( 중복 비허용 ) 본문
Data/Spark
[Spark] Matrix의 Row를 전체 Vertex 개수로 두고 Col 기준으로 나누기 ( 중복 비허용 )
운호(Noah) 2020. 10. 11. 00:14Matrix의 Row를 전체 Vertex 개수로 두고 Col 기준으로 나누기 ( 중복 비허용 )
예) Matrix의 전체 Column size가 12이고 slice 단위를 5으로 한다면 다음과 같이 분할 됨.
1-5, 6-10
나머지 column인 11-12는 버려짐
예제코드
1024 개의 열을 300 개씩 중복 없이 Slice 하므로 총 3개의 Slice가 나온다.
1-300, 301-600, 601-900
import scala.math.BigDecimal val rdd = sc.textFile("s3://square-matrix/M_1024_1024_0.001.txt") // 입력 데이터 row, col 크기 val row = 1024 val col = 1024 // 원하는 slice 단위 val slice_unit = 300 // slice의 시작,끝 idx var start = 1 var end = slice_unit while (end <= col){ // col을 slice 단위로 자른다. val slice = rdd.map{x=>x.split(" ")}.map{x => (x(0).toInt, x(1).toInt, x(2).toDouble)}.filter{x=> x._2 >= start && x._2 <= end} // slice의 nnz val nnz = slice.count // slice의 density를 소수점 6자리로 반올림 val density = BigDecimal(nnz.toDouble / (row * slice_unit).toDouble).setScale(6, BigDecimal.RoundingMode.HALF_UP) val result = slice.map{x=>x._1 + " " + x._2 + " " + x._3} // s3에 동일한 파일이 있을 경우엔 저장하지 않는다. try{ result.coalesce(1).saveAsTextFile("s3://non-square-matrix/M_"+row+"_"+slice_unit+"_"+nnz+"_"+density+".txt") println("s3://non-square-matrix/M_"+row+"_"+slice_unit+"_"+nnz+"_"+density+".txt") }catch{ case x: org.apache.hadoop.mapred.FileAlreadyExistsException => { println("s3://non-square-matrix/M_"+row+"_"+slice_unit+"_"+nnz+"_"+density+".txt"+" Already Exists in S3") } } start = start + slice_unit end = end + slice_unit }
Matrix의 Col를 전체 Vertex 개수로 두고 Row 기준으로 나누기 ( 중복 비허용 )
예제 코드
import scala.math.BigDecimal val rdd = sc.textFile("s3://square-matrix/M_1024_1024_0.001.txt") // 입력 데이터 row, col 크기 val row = 1024 val col = 1024 // 원하는 slice 단위 val slice_unit = 300 // slice의 시작,끝 idx var start = 1 var end = slice_unit while (end <= col){ // row을 slice 단위로 자른다. val slice = rdd.map{x=>x.split(" ")}.map{x => (x(0).toInt, x(1).toInt, x(2).toDouble)}.filter{x=> x._1 >= start && x._1 <= end} // slice의 nnz val nnz = slice.count // slice의 density를 소수점 6자리로 반올림 val density = BigDecimal(nnz.toDouble / (row * slice_unit).toDouble).setScale(6, BigDecimal.RoundingMode.HALF_UP) val result = slice.map{x=>x._1 + " " + x._2 + " " + x._3} // s3에 동일한 파일이 있을 경우엔 저장하지 않는다. try{ result.coalesce(1).saveAsTextFile("s3://non-square-matrix/M_"+slice_unit+"_"+col+"_"+nnz+"_"+density+".txt") println("s3://non-square-matrix/M_"+slice_unit+"_"+col+"_"+nnz+"_"+density+".txt"+" is put in S3") }catch{ case x: org.apache.hadoop.mapred.FileAlreadyExistsException => { println("s3://non-square-matrix/M_"+slice_unit+"_"+col+"_"+nnz+"_"+density+".txt"+" Already Exists in S3") } } start = start + slice_unit end = end + slice_unit }
'Data > Spark' 카테고리의 다른 글
[Spark] Spark Property 설정 (0) | 2020.10.19 |
---|---|
[Spark] Matrix의 Row를 전체 Vertex 개수로 두고 (LRxLC),(LCxLC) 형태 Matrix 생성하기 (0) | 2020.10.14 |
[Spark] Matrix의 Row를 전체 Vertex 개수로 두고 Col 기준으로 나누기 ( 중복 허용 ) (0) | 2020.10.10 |
[Spark] RDD 데이터를 saveAsTextFile 을 사용해 S3 에 저장하기 (0) | 2020.10.10 |
[Spark] RDD의 내용을 출력하는 방법 (0) | 2020.10.10 |
Comments