우노
[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 2 본문
자동화에 필요한 파일 종류
- 1) jar 파일
- 1-1) build.sbt 작성
- 1-2) scala 코드 작성
- 1-3) 각 scala 코드에 대한 jar 파일 생성 후, s3 에 업로드
- 2) 실험 쉘스크립트(spmm.sh)
- 2-1) 작성 후, s3 에 업로드
- 3) 클러스터 생성 쉘스크립트(createcluster.sh)
- 4) 입력파일(input.csv)
- 4-1) s3 에 업로드
1) jar 파일
1-1) build.sbt 작성
version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1" libraryDependencies += "org.scalanlp" %% "breeze" % "1.1" artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "test.jar" }
1-2) scala 코드 작성
spark_test.scala
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession import org.apache.spark.ml.linalg.SparseMatrix import java.util.concurrent.TimeUnit.NANOSECONDS // 로컬파일에 결과를 적기 위해 라이브러리 호출 import java.io.PrintWriter import java.io.File import java.io.FileOutputStream object test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test").getOrCreate() import spark.implicits._ // 입력인자 변수화 val left_input = args(0).toString val right_input = args(1).toString val lr = args(2).toInt val lc = args(3).toInt val rc = args(4).toInt val result_dir = args(5).toString // s3 입력파일 rdd화 val left_mat = spark.sparkContext.textFile(left_input) val right_mat = spark.sparkContext.textFile(right_input) // 양쪽 Spark sparse Matrix 생성 val left_mat_iterable = left_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable val left_sm = SparseMatrix.fromCOO(lr,lc,left_mat_iterable) val right_mat_iterable = right_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable val right_sm = SparseMatrix.fromCOO(lc,rc,right_mat_iterable) // 우측 행렬 sm.toDense 시간 측정 var tik0 = System.nanoTime() val right_dm = right_sm.toDense var tik1 = System.nanoTime() // sm * dm 시간 측정 var tik2 = System.nanoTime() left_sm.multiply(right_dm) var tik3 = System.nanoTime() // 지연시간계산 val latency1 = NANOSECONDS.toMillis(tik1 - tik0) val latency2 = NANOSECONDS.toMillis(tik3 - tik2) // writer를 사용해 로컬 파일에 결과 적기 val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true)) writer.write(latency1 + "," + latency2 + ",") writer.close } }
breeze_test.scala
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession import breeze.linalg.CSCMatrix import java.util.concurrent.TimeUnit.NANOSECONDS // 로컬파일에 결과를 적기 위해 모듈 호출 import java.io.PrintWriter import java.io.File import java.io.FileOutputStream object test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("test").getOrCreate() import spark.implicits._ // 입력인자 변수화 val left_input = args(0).toString val right_input = args(1).toString val lr = args(2).toInt val lc = args(3).toInt val rc = args(4).toInt val result_dir = args(5).toString // s3 입력파일 rdd화 val left_mat = spark.sparkContext.textFile(left_input) val right_mat = spark.sparkContext.textFile(right_input) // 양쪽 Breeze sparse Matrix 생성 val left_mat_line = left_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect val left_builder = new CSCMatrix.Builder[Double](rows=lr,cols=lc) for (x <- left_mat_line) left_builder.add(x._1,x._2,x._3) val left_sm = left_builder.result() val right_mat_line = right_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect val right_builder = new CSCMatrix.Builder[Double](rows=lc,cols=rc) for (x <- right_mat_line) right_builder.add(x._1,x._2,x._3) val right_sm = right_builder.result() // sm * sm 시간 측정 var tik0 = System.nanoTime() left_sm * right_sm var tik1 = System.nanoTime() // 우측 행렬 sm.toDense 시간 측정 var tik2 = System.nanoTime() val right_dm = right_sm.toDense var tik3 = System.nanoTime() // sm * dm 시간 측정 var tik4 = System.nanoTime() left_sm * right_dm var tik5 = System.nanoTime() // 지연시간계산 val latency1 = NANOSECONDS.toMillis(tik1 - tik0) val latency2 = NANOSECONDS.toMillis(tik3 - tik2) val latency3 = NANOSECONDS.toMillis(tik5 - tik4) // writer를 사용해 로컬에 결과 적기 val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true)) writer.write(latency1 + "," + latency2 + "," + latency3 + "\n") writer.close } }
1-3) 각 scala 코드에 대한 jar 파일 생성 후, s3 에 업로드
2) 실험 쉘스크립트(spmm.sh)
2-1) 실험 쉘스크립트(spmm.sh)는 aws emr create-cluster 명령시 실행되며, 작성 후 s3에 업로드해둔다.
#!/bin/bash # 입출력파일 s3 경로 matrixdata_s3=$1 inputcsv_s3=$2 outputcsv_s3=$3 # 입력파일 다운로드 aws s3 cp $2 /home/hadoop/ # 입출력파일명을 절대경로로 재설정 inputcsv_parse=(`echo $inputcsv_s3 | tr "/" "\n"`) inputcsv_file="/home/hadoop/${inputcsv_parse[${#inputcsv_parse[@]}-1]}" outputcsv_parse=(`echo $outputcsv_s3 | tr "/" "\n"`) outputcsv_file="/home/hadoop/${outputcsv_parse[${#outputcsv_parse[@]}-1]}" # jar 파일 s3 경로 sparkjar="s3://unho-spmm/jars/spark-test.jar" breezejar="s3://unho-spmm/jars/breeze-test.jar" count=1 # 입력파일의 마지막 줄 마지막 문자를 \n으로 치환하며 # line 마다 ',' 기준으로 split 해 변수로 사용한다. sed "$ s/\$/\\n/" ${inputcsv_file} | tr -d '\r' | while IFS=',' read lr lc rc ld rd lnnz rnnz; do # 출력파일에 head 작성 if [ $count -eq 1 ]; then echo "lr,lc,rc,ld,rd,lnnz,rnnz,sp_right_todense,sp_smdm,bz_smsm,bz_right_todense,bz_smdm" >> ${outputcsv_file} fi # 출력파일에 head 이후부터 작성 if [ $count -gt 1 ]; then ld=`printf '%.8f' $ld` rd=`printf '%.8f' $rd` # 왼쪽, 오른쪽 행렬 s3 경로 l_mat="${matrixdata_s3}M_${lr}_${lc}_${ld}_${lnnz}" r_mat="${matrixdata_s3}M_${lc}_${rc}_${rd}_${rnnz}" # 출력파일에 결과 작성 echo -n "${lr},${lc},${rc},${ld},${rd},${lnnz},${rnnz}," >> ${outputcsv_file} # spark 코드 실행 후 출력파일에 결과 작성 spark-submit --master local --driver-memory 120G --conf spark.driver.maxResultSize=4g --class test ${sparkjar} \ ${l_mat} ${r_mat} ${lr} ${lc} ${rc} ${outputcsv_file} # breeze 코드 실행 후 출력파일에 결과 작성 spark-submit --master local --driver-memory 120G --conf spark.driver.maxResultSize=4g --class test ${breezejar} \ ${l_mat} ${r_mat} ${lr} ${lc} ${rc} ${outputcsv_file} fi ((count++)) done # 결과파일을 s3에 업로드 aws s3 cp ${outputcsv_file} ${outputcsv_s3}
3) 클러스터 생성 쉘스크립트(createcluster.sh)
클러스터 생성 쉘스크립트(createcluster.sh)는 aws emr create-cluster 자동화를 위한 스크립트이다.
#!/bin/bash instance_type=$1 # instance type spmmsh=$2 # 실험 쉘 스크립트 s3 경로 matrixdata_s3=$3 # matrix data s3 경로 inputcsv_s3=$4 # input.csv s3 경로 outputcsv_s3=$5 # output.csv s3 경로 loguri="s3://unho-spmm/emr-logs/" # cluster 생성 후 실험이 끝나면 cluster 삭제 aws emr create-cluster \ --name "unho-cluster" \ --release-label emr-6.2.0 \ --applications Name=Hadoop Name=Zeppelin Name=Spark \ --use-default-roles \ --ec2-attributes KeyName=unho-tokyo \ --instance-groups InstanceGroupType=MASTER,InstanceType=${instance_type},InstanceCount=1,Name="Master Group" \ --log-uri ${loguri} \ --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://ap-northeast-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[${spmmsh},${matrixdata_s3},${inputcsv_s3},${outputcsv_s3}] \ --auto-terminate
4) 입력파일(input.csv)
- 해당 실험에서는 s3에서 csv 파일을 읽어와 실험을 진행하므로, csv 파일을 s3에 업로드해둔다.
실행 방법
작성한 클러스터 생성 쉘스크립트(createcluster.sh)에 원하는 인자를 넘겨 실행한다.
s3의 input.csv를 읽어와 실험을 진행한 뒤, s3의 output.csv에 결과를 저장하게 된다.
# arg1 : instance type # arg2 : shell script s3 경로 # arg3 : matrix data s3 경로 # arg4 : input.csv s3 경로 # arg5 : output.csv s3 경로 ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-2-51.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-2-51.csv" ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-52-101.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-52-101.csv" ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-102-151.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-102-151.csv" ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-152-201.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-152-201.csv"
실행 과정 요약
- 클러스터 생성 쉘스크립트(createcluster.sh) 실행
- 실험 쉘스크립트(spmm.sh) 실행
- 결과는 s3에 저장
- 실험 쉘스크립트(spmm.sh) 종료
- 클러스터 생성 쉘스크립트(createcluster.sh) 종료
'AWS > EMR' 카테고리의 다른 글
[AWS EMR] Spark, Breeze 서브 모듈 빌드해서 사용하기 (0) | 2021.02.23 |
---|---|
[AWS EMR] yarn-site.xml 위치 (0) | 2020.10.27 |
[AWS EMR] EMR 재시작 방법 (0) | 2020.10.27 |
[AWS EMR] spark jar 파일 위치 (0) | 2020.07.15 |
[AWS EMR] spark-defaults.conf 위치 (0) | 2020.07.10 |
Comments