오늘의 인기 글
최근 글
최근 댓글
Today
Total
04-29 01:41
관리 메뉴

우노

[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 2 본문

AWS/EMR

[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 2

운호(Noah) 2020. 12. 6. 23:06

자동화에 필요한 파일 종류

  • 1) jar 파일
    • 1-1) build.sbt 작성
    • 1-2) scala 코드 작성
    • 1-3) 각 scala 코드에 대한 jar 파일 생성 후, s3 에 업로드
  • 2) 실험 쉘스크립트(spmm.sh)
    • 2-1) 작성 후, s3 에 업로드
  • 3) 클러스터 생성 쉘스크립트(createcluster.sh)
  • 4) 입력파일(input.csv)
    • 4-1) s3 에 업로드

1) jar 파일

  • 1-1) build.sbt 작성

      version := "1.0"
      scalaVersion := "2.12.10"
    
      libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1"
      libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
      libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1"
      libraryDependencies += "org.scalanlp" %% "breeze" % "1.1" 
    
      artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
              "test.jar"
      }
  • 1-2) scala 코드 작성

    • spark_test.scala

        import org.apache.spark.sql._
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.ml.linalg.SparseMatrix
        import java.util.concurrent.TimeUnit.NANOSECONDS
      
        // 로컬파일에 결과를 적기 위해 라이브러리 호출
        import java.io.PrintWriter
        import java.io.File
        import java.io.FileOutputStream
      
        object test {
          def main(args: Array[String]): Unit = {
      
              val spark = SparkSession.builder.appName("test").getOrCreate()
              import spark.implicits._
      
              // 입력인자 변수화
              val left_input = args(0).toString
              val right_input = args(1).toString
              val lr = args(2).toInt
              val lc = args(3).toInt
              val rc = args(4).toInt
              val result_dir = args(5).toString
      
              // s3 입력파일 rdd화
              val left_mat = spark.sparkContext.textFile(left_input)
              val right_mat = spark.sparkContext.textFile(right_input)
      
              // 양쪽 Spark sparse Matrix 생성
              val left_mat_iterable = left_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable
              val left_sm = SparseMatrix.fromCOO(lr,lc,left_mat_iterable)
              val right_mat_iterable = right_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable
              val right_sm = SparseMatrix.fromCOO(lc,rc,right_mat_iterable)
      
              // 우측 행렬 sm.toDense 시간 측정
              var tik0 = System.nanoTime()
              val right_dm = right_sm.toDense
              var tik1 = System.nanoTime()
      
              // sm * dm 시간 측정
              var tik2 = System.nanoTime()
              left_sm.multiply(right_dm)
              var tik3 = System.nanoTime()
      
              // 지연시간계산
              val latency1 = NANOSECONDS.toMillis(tik1 - tik0)
              val latency2 = NANOSECONDS.toMillis(tik3 - tik2)
      
              // writer를 사용해 로컬 파일에 결과 적기
              val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true))
              writer.write(latency1 + "," + latency2 + ",")
              writer.close
      
          }
        }
    • breeze_test.scala

        import org.apache.spark.sql._
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.SparkSession
        import breeze.linalg.CSCMatrix
        import java.util.concurrent.TimeUnit.NANOSECONDS                                
      
        // 로컬파일에 결과를 적기 위해 모듈 호출
        import java.io.PrintWriter
        import java.io.File
        import java.io.FileOutputStream
      
        object test {
          def main(args: Array[String]): Unit = {
      
                val spark = SparkSession.builder.appName("test").getOrCreate()
                import spark.implicits._
      
                // 입력인자 변수화
                val left_input = args(0).toString
                val right_input = args(1).toString
                val lr = args(2).toInt
                  val lc = args(3).toInt
                  val rc = args(4).toInt
                val result_dir = args(5).toString
      
                // s3 입력파일 rdd화
                val left_mat = spark.sparkContext.textFile(left_input)
                val right_mat = spark.sparkContext.textFile(right_input)
      
                // 양쪽 Breeze sparse Matrix 생성
                val left_mat_line = left_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect
                val left_builder = new CSCMatrix.Builder[Double](rows=lr,cols=lc)
                for (x <- left_mat_line)
                    left_builder.add(x._1,x._2,x._3)
                val left_sm = left_builder.result()
      
                val right_mat_line = right_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect
                val right_builder = new CSCMatrix.Builder[Double](rows=lc,cols=rc)
                for (x <- right_mat_line)
                    right_builder.add(x._1,x._2,x._3)
                val right_sm = right_builder.result()
      
                // sm * sm 시간 측정
                var tik0 = System.nanoTime()
                left_sm * right_sm
                var tik1 = System.nanoTime()
      
                // 우측 행렬 sm.toDense 시간 측정
                var tik2 = System.nanoTime()
                val right_dm = right_sm.toDense
                var tik3 = System.nanoTime()
      
                // sm * dm 시간 측정
                var tik4 = System.nanoTime()
                left_sm * right_dm
                var tik5 = System.nanoTime()
      
                // 지연시간계산
                val latency1 = NANOSECONDS.toMillis(tik1 - tik0)
                val latency2 = NANOSECONDS.toMillis(tik3 - tik2)
                val latency3 = NANOSECONDS.toMillis(tik5 - tik4)   
      
                // writer를 사용해 로컬에 결과 적기
                val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true))
                writer.write(latency1 + "," + latency2 + "," + latency3 + "\n")
                writer.close
            }
        }
  • 1-3) 각 scala 코드에 대한 jar 파일 생성 후, s3 에 업로드

2) 실험 쉘스크립트(spmm.sh)

  • 2-1) 실험 쉘스크립트(spmm.sh)는 aws emr create-cluster 명령시 실행되며, 작성 후 s3에 업로드해둔다.

      #!/bin/bash
    
      # 입출력파일 s3 경로
      matrixdata_s3=$1
      inputcsv_s3=$2
      outputcsv_s3=$3
    
      # 입력파일 다운로드
      aws s3 cp $2 /home/hadoop/
    
      # 입출력파일명을 절대경로로 재설정
      inputcsv_parse=(`echo $inputcsv_s3 | tr "/" "\n"`)
      inputcsv_file="/home/hadoop/${inputcsv_parse[${#inputcsv_parse[@]}-1]}"
      outputcsv_parse=(`echo $outputcsv_s3 | tr "/" "\n"`)
      outputcsv_file="/home/hadoop/${outputcsv_parse[${#outputcsv_parse[@]}-1]}"
    
      # jar 파일 s3 경로
      sparkjar="s3://unho-spmm/jars/spark-test.jar"
      breezejar="s3://unho-spmm/jars/breeze-test.jar"
    
      count=1
      # 입력파일의 마지막 줄 마지막 문자를 \n으로 치환하며
      # line 마다 ',' 기준으로 split 해 변수로 사용한다.
      sed "$ s/\$/\\n/" ${inputcsv_file} | tr -d '\r' | while IFS=',' read lr lc rc ld rd lnnz rnnz;
      do
    
              # 출력파일에 head 작성
          if [ $count -eq 1 ]; then
              echo "lr,lc,rc,ld,rd,lnnz,rnnz,sp_right_todense,sp_smdm,bz_smsm,bz_right_todense,bz_smdm" >> ${outputcsv_file}
          fi    
    
          # 출력파일에 head 이후부터 작성
          if [ $count -gt 1 ]; then
    
                      ld=`printf '%.8f' $ld`
                      rd=`printf '%.8f' $rd`
    
                      # 왼쪽, 오른쪽 행렬 s3 경로
                      l_mat="${matrixdata_s3}M_${lr}_${lc}_${ld}_${lnnz}"
                      r_mat="${matrixdata_s3}M_${lc}_${rc}_${rd}_${rnnz}"
    
                      # 출력파일에 결과 작성
                      echo -n "${lr},${lc},${rc},${ld},${rd},${lnnz},${rnnz}," >> ${outputcsv_file}
    
                      # spark 코드 실행 후 출력파일에 결과 작성
                      spark-submit --master local --driver-memory 120G --conf spark.driver.maxResultSize=4g --class test ${sparkjar} \
                      ${l_mat} ${r_mat} ${lr} ${lc} ${rc} ${outputcsv_file}
    
                      # breeze 코드 실행 후 출력파일에 결과 작성
                      spark-submit --master local --driver-memory 120G --conf spark.driver.maxResultSize=4g --class test ${breezejar} \
                      ${l_mat} ${r_mat} ${lr} ${lc} ${rc} ${outputcsv_file}
          fi
    
          ((count++))
    
      done
    
      # 결과파일을 s3에 업로드
      aws s3 cp ${outputcsv_file} ${outputcsv_s3}

3) 클러스터 생성 쉘스크립트(createcluster.sh)

  • 클러스터 생성 쉘스크립트(createcluster.sh)는 aws emr create-cluster 자동화를 위한 스크립트이다.

      #!/bin/bash
    
      instance_type=$1    # instance type
      spmmsh=$2           # 실험 쉘 스크립트 s3 경로
      matrixdata_s3=$3    # matrix data s3 경로
      inputcsv_s3=$4      # input.csv s3 경로
      outputcsv_s3=$5     # output.csv s3 경로
    
      loguri="s3://unho-spmm/emr-logs/"
    
      # cluster 생성 후 실험이 끝나면 cluster 삭제
      aws emr create-cluster \
      --name "unho-cluster" \
      --release-label emr-6.2.0 \
      --applications Name=Hadoop Name=Zeppelin Name=Spark \
      --use-default-roles \
      --ec2-attributes KeyName=unho-tokyo \
      --instance-groups InstanceGroupType=MASTER,InstanceType=${instance_type},InstanceCount=1,Name="Master Group" \
      --log-uri ${loguri} \
      --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://ap-northeast-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[${spmmsh},${matrixdata_s3},${inputcsv_s3},${outputcsv_s3}] \
      --auto-terminate

4) 입력파일(input.csv)

  • 해당 실험에서는 s3에서 csv 파일을 읽어와 실험을 진행하므로, csv 파일을 s3에 업로드해둔다.

실행 방법

  • 작성한 클러스터 생성 쉘스크립트(createcluster.sh)에 원하는 인자를 넘겨 실행한다.

    • s3의 input.csv를 읽어와 실험을 진행한 뒤, s3의 output.csv에 결과를 저장하게 된다.

        # arg1 : instance type
        # arg2 : shell script s3 경로
        # arg3 : matrix data s3 경로
        # arg4 : input.csv s3 경로
        # arg5 : output.csv s3 경로
        ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-2-51.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-2-51.csv"
        ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-52-101.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-52-101.csv"
        ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-102-151.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-102-151.csv"
        ./createcluster.sh "m5.8xlarge" "s3://unho-spmm/sh/spmm.sh" "s3://unho-spmm/custom-nonsquare-matrix/high-rd-v4/" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-input-152-201.csv" "s3://unho-spmm/csv/spmm-scenario-extraction/experiment-high-rd-v4/spmm-nonsquare-high-rd-v4-output-152-201.csv"

실행 과정 요약

  • 클러스터 생성 쉘스크립트(createcluster.sh) 실행
    • 실험 쉘스크립트(spmm.sh) 실행
    • 결과는 s3에 저장
    • 실험 쉘스크립트(spmm.sh) 종료
  • 클러스터 생성 쉘스크립트(createcluster.sh) 종료
Comments