우노
[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 1 본문
실험 환경
- AWS EMR 환경에서 진행
- Input file은 s3에서 가져온다.
- Output file은 local에 저장한다.
1. scala 코드 작성
spark_test.scala 코드 작성
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession import org.apache.spark.ml.linalg.SparseMatrix import java.util.concurrent.TimeUnit.NANOSECONDS // 로컬파일에 결과를 적기 위해 모듈 호출 import java.io.PrintWriter import java.io.File import java.io.FileOutputStream object spark_test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test").getOrCreate() import spark.implicits._ // 입력인자 변수화 val left_input = args(0).toString val right_input = args(1).toString val result_dir = args(2).toString val size = args(3).toInt // 입력파일명 저장 val left_info = left_input.split("/").takeRight(1) val right_info = right_input.split("/").takeRight(1) // s3 입력파일 rdd화 val left_mat = spark.sparkContext.textFile(left_input) val right_mat = spark.sparkContext.textFile(right_input) // 양쪽 Spark sparse Matrix 생성 val left_mat_iterable = left_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable val left_sm = SparseMatrix.fromCOO(size,size,left_mat_iterable) val right_mat_iterable = right_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable val right_sm = SparseMatrix.fromCOO(size,size,right_mat_iterable) // 우측 행렬 sm.toDense 시간 측정 var tik0 = System.nanoTime() val right_dm = right_sm.toDense var tik1 = System.nanoTime() // sm * dm 시간 측정 var tik2 = System.nanoTime() left_sm.multiply(right_dm) var tik3 = System.nanoTime() // 지연시간계산 val latency1 = NANOSECONDS.toMillis(tik1 - tik0) val latency2 = NANOSECONDS.toMillis(tik3 - tik2) // writer를 사용해 로컬 파일에 결과 적기 val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true)) writer.write(" spark " + left_info(0) + " * " + right_info(0) + "\n") writer.write(" right_sm.toDense : " + latency1 + " ms\n") writer.write(" sm * dm : " + latency2 + " ms\n") writer.write(" \n") writer.close } }
breeze_test.scala 코드 작성
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession import breeze.linalg.CSCMatrix import java.util.concurrent.TimeUnit.NANOSECONDS // 로컬파일에 결과를 적기 위해 모듈 호출 import java.io.PrintWriter import java.io.File import java.io.FileOutputStream object breeze_test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test").getOrCreate() import spark.implicits._ // 입력인자 변수화 val left_input = args(0).toString val right_input = args(1).toString val result_dir = args(2).toString val size = args(3).toInt // 입력파일명 저장 val left_info = left_input.split("/").takeRight(1) val right_info = right_input.split("/").takeRight(1) // s3 입력파일 rdd화 val left_mat = spark.sparkContext.textFile(left_input) val right_mat = spark.sparkContext.textFile(right_input) // 양쪽 Breeze sparse Matrix 생성 val left_mat_line = left_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect val left_builder = new CSCMatrix.Builder[Double](rows=size,cols=size) for (x <- left_mat_line) left_builder.add(x._1,x._2,x._3) val left_sm = left_builder.result() val right_mat_line = right_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect val right_builder = new CSCMatrix.Builder[Double](rows=size,cols=size) for (x <- right_mat_line) right_builder.add(x._1,x._2,x._3) val right_sm = right_builder.result() // sm * sm 시간 측정 var tik0 = System.nanoTime() left_sm * right_sm var tik1 = System.nanoTime() // 우측 행렬 sm.toDense 시간 측정 var tik2 = System.nanoTime() val right_dm = right_sm.toDense var tik3 = System.nanoTime() // sm * dm 시간 측정 var tik4 = System.nanoTime() left_sm * right_dm var tik5 = System.nanoTime() // 지연시간계산 val latency1 = NANOSECONDS.toMillis(tik1 - tik0) val latency2 = NANOSECONDS.toMillis(tik3 - tik2) val latency3 = NANOSECONDS.toMillis(tik5 - tik4) // writer를 사용해 로컬에 결과 적기 val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true)) writer.write(" breeze " + left_info(0) + " * " + right_info(0) + "\n") writer.write(" sm * sm : " + latency1 + " ms\n") writer.write(" right_sm.toDense : " + latency2 + " ms\n") writer.write(" sm * dm : " + latency3 + " ms\n") writer.write(" \n") writer.close } }
2. build.sbt 작성
sbt 설치
build.sbt 작성
Scala version 확인 후 맞추기
Scala version에 따른 Core, Sql version 맞추기
spark jar용 build.sbt
version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.4" # jar 파일명 지정 artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "spark_test.jar" }
breeze jar용 build.sbt
version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.scalanlp" %% "breeze" % "0.12" # jar 파일명 지정 artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "breeze_test.jar" }
sbt 를 사용해 2가지 jar 파일 생성
sbt package
3. 쉘 스크립트를 사용한 Spark-submit 실행
쉘 스크립트 작성
#!/bin/bash # resource 제한 master=local drivermemory=120G # 반복문을 통해 입력값 다룬다. for size in 2048 4096 do for left_density in 0.0005 0.001 0.002 0.003 0.004 0.007 0.01 0.04 0.07 0.1 0.4 do for right_density in 0.0005 0.001 0.002 0.003 0.004 0.007 0.01 0.04 0.07 0.1 0.4 do # spark 코드 실행 spark-submit --master $master --driver-memory $drivermemory --class spark_test ./spark/target/scala-2.11/spark_test.jar \ s3://unho-bucket/M_${size}_${size}_${left_density}.txt s3://unho-bucket/M_${size}_${size}_${right_density}.txt ./result ${size} # breeze 코드 실행 spark-submit --master $master --driver-memory $drivermemory --class breeze_test ./breeze/target/scala-2.11/breeze_test.jar \ s3://unho-bucket/M_${size}_${size}_${left_density}.txt s3://unho-bucket/M_${size}_${size}_${right_density}.txt ./result ${size} done done done
권한 수정
sudo chmod 755 run.sh
실행
./run.sh
'AWS > EMR' 카테고리의 다른 글
[AWS EMR] yarn-site.xml 위치 (0) | 2020.10.27 |
---|---|
[AWS EMR] EMR 재시작 방법 (0) | 2020.10.27 |
[AWS EMR] spark jar 파일 위치 (0) | 2020.07.15 |
[AWS EMR] spark-defaults.conf 위치 (0) | 2020.07.10 |
[AWS EMR] Zeppelin 재시작 (0) | 2020.07.10 |
Comments