오늘의 인기 글
최근 글
최근 댓글
Today
Total
04-30 00:02
관리 메뉴

우노

[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 1 본문

AWS/EMR

[AWS EMR] 쉘 스크립트를 이용한 실험자동화 part 1

운호(Noah) 2020. 7. 7. 20:56

실험 환경

  • AWS EMR 환경에서 진행
  • Input file은 s3에서 가져온다.
  • Output file은 local에 저장한다.

1. scala 코드 작성

  • spark_test.scala 코드 작성

      import org.apache.spark.sql._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.ml.linalg.SparseMatrix
      import java.util.concurrent.TimeUnit.NANOSECONDS
    
      // 로컬파일에 결과를 적기 위해 모듈 호출
      import java.io.PrintWriter
      import java.io.File
      import java.io.FileOutputStream
    
      object spark_test {
        def main(args: Array[String]): Unit = {
    
              val spark = SparkSession.builder.appName("test").getOrCreate()
              import spark.implicits._
    
                      // 입력인자 변수화
                        val left_input = args(0).toString
                      val right_input = args(1).toString
                      val result_dir = args(2).toString
                      val size = args(3).toInt
    
                      // 입력파일명 저장
                      val left_info = left_input.split("/").takeRight(1)
                      val right_info = right_input.split("/").takeRight(1) 
    
                      // s3 입력파일 rdd화
                        val left_mat = spark.sparkContext.textFile(left_input)
                        val right_mat = spark.sparkContext.textFile(right_input)
    
                      // 양쪽 Spark sparse Matrix 생성
                      val left_mat_iterable = left_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable
                      val left_sm = SparseMatrix.fromCOO(size,size,left_mat_iterable)
                      val right_mat_iterable = right_mat.map(x=>x.split(" ")).map(x => (x(0).toInt, x(1).toInt, x(2).toDouble)).collect.toIterable
                      val right_sm = SparseMatrix.fromCOO(size,size,right_mat_iterable)
    
                      // 우측 행렬 sm.toDense 시간 측정
                      var tik0 = System.nanoTime()
                      val right_dm = right_sm.toDense
                      var tik1 = System.nanoTime()
    
                      // sm * dm 시간 측정
                      var tik2 = System.nanoTime()
                      left_sm.multiply(right_dm)
                      var tik3 = System.nanoTime()
    
                      // 지연시간계산
                      val latency1 = NANOSECONDS.toMillis(tik1 - tik0)
                      val latency2 = NANOSECONDS.toMillis(tik3 - tik2)
    
                      // writer를 사용해 로컬 파일에 결과 적기
                      val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true))
                      writer.write(" spark " + left_info(0) + " * " + right_info(0) + "\n")
                      writer.write(" right_sm.toDense : " + latency1 + " ms\n")
                      writer.write(" sm * dm : " + latency2 + " ms\n")
                      writer.write(" \n")
                      writer.close
    
        }
      }
    
  • breeze_test.scala 코드 작성

      import org.apache.spark.sql._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.SparkSession
      import breeze.linalg.CSCMatrix
      import java.util.concurrent.TimeUnit.NANOSECONDS                                
    
      // 로컬파일에 결과를 적기 위해 모듈 호출
      import java.io.PrintWriter
      import java.io.File
      import java.io.FileOutputStream
    
      object breeze_test {
        def main(args: Array[String]): Unit = {
    
              val spark = SparkSession.builder.appName("test").getOrCreate()
              import spark.implicits._
    
                      // 입력인자 변수화
                        val left_input = args(0).toString
                      val right_input = args(1).toString
                      val result_dir = args(2).toString
                      val size = args(3).toInt
    
                      // 입력파일명 저장
                      val left_info = left_input.split("/").takeRight(1)
                      val right_info = right_input.split("/").takeRight(1) 
    
                      // s3 입력파일 rdd화
                        val left_mat = spark.sparkContext.textFile(left_input)
                        val right_mat = spark.sparkContext.textFile(right_input)
    
                      // 양쪽 Breeze sparse Matrix 생성
                      val left_mat_line = left_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect
                      val left_builder = new CSCMatrix.Builder[Double](rows=size,cols=size)
                      for (x <- left_mat_line)
                          left_builder.add(x._1,x._2,x._3)
                      val left_sm = left_builder.result()
    
                      val right_mat_line = right_mat.map(x => x.split(" ")).map(x => (x(0).toInt,x(1).toInt,x(2).toDouble)).collect
                      val right_builder = new CSCMatrix.Builder[Double](rows=size,cols=size)
                      for (x <- right_mat_line)
                          right_builder.add(x._1,x._2,x._3)
                      val right_sm = right_builder.result()
    
                      // sm * sm 시간 측정
                      var tik0 = System.nanoTime()
                      left_sm * right_sm
                      var tik1 = System.nanoTime()
    
                      // 우측 행렬 sm.toDense 시간 측정
                      var tik2 = System.nanoTime()
                      val right_dm = right_sm.toDense
                      var tik3 = System.nanoTime()
    
                      // sm * dm 시간 측정
                      var tik4 = System.nanoTime()
                      left_sm * right_dm
                      var tik5 = System.nanoTime()
    
                      // 지연시간계산
                      val latency1 = NANOSECONDS.toMillis(tik1 - tik0)
                      val latency2 = NANOSECONDS.toMillis(tik3 - tik2)
                      val latency3 = NANOSECONDS.toMillis(tik5 - tik4)
    
                      // writer를 사용해 로컬에 결과 적기
                      val writer = new PrintWriter(new FileOutputStream(new File(result_dir), true))
                      writer.write(" breeze " + left_info(0) + " * " + right_info(0) + "\n")
                      writer.write(" sm * sm : " + latency1 + " ms\n")
                      writer.write(" right_sm.toDense : " + latency2 + " ms\n")
                      writer.write(" sm * dm : " + latency3 + " ms\n")
                      writer.write(" \n")
                      writer.close
    
        }
      }
    

2. build.sbt 작성

  • sbt 설치

  • build.sbt 작성

    • Scala version 확인 후 맞추기

    • Scala version에 따른 Core, Sql version 맞추기

    • spark jar용 build.sbt

          version := "1.0"
          scalaVersion := "2.11.12"
      
          libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"
          libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
          libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.4" 
      
          # jar 파일명 지정
          artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
                  "spark_test.jar"
          }
      
    • breeze jar용 build.sbt

          version := "1.0"
          scalaVersion := "2.11.12"
      
          libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"
          libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
          libraryDependencies += "org.scalanlp" %% "breeze" % "0.12"
      
          # jar 파일명 지정
          artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
                  "breeze_test.jar"
          }
      
  • sbt 를 사용해 2가지 jar 파일 생성

        sbt package

3. 쉘 스크립트를 사용한 Spark-submit 실행

  1. 쉘 스크립트 작성

     #!/bin/bash
    
     # resource 제한
     master=local
     drivermemory=120G
    
     # 반복문을 통해 입력값 다룬다.
     for size in 2048 4096
     do
     for left_density in 0.0005 0.001 0.002 0.003 0.004 0.007 0.01 0.04 0.07 0.1 0.4
     do
     for right_density in 0.0005 0.001 0.002 0.003 0.004 0.007 0.01 0.04 0.07 0.1 0.4
     do
    
     # spark 코드 실행
     spark-submit --master $master --driver-memory $drivermemory --class spark_test ./spark/target/scala-2.11/spark_test.jar \
     s3://unho-bucket/M_${size}_${size}_${left_density}.txt s3://unho-bucket/M_${size}_${size}_${right_density}.txt ./result ${size} 
    
     # breeze 코드 실행
     spark-submit --master $master --driver-memory $drivermemory --class breeze_test ./breeze/target/scala-2.11/breeze_test.jar \
     s3://unho-bucket/M_${size}_${size}_${left_density}.txt s3://unho-bucket/M_${size}_${size}_${right_density}.txt ./result ${size} 
    
     done
     done
     done
    
  2. 권한 수정

     sudo chmod 755 run.sh
    
  3. 실행

     ./run.sh 
    

'AWS > EMR' 카테고리의 다른 글

[AWS EMR] yarn-site.xml 위치  (0) 2020.10.27
[AWS EMR] EMR 재시작 방법  (0) 2020.10.27
[AWS EMR] spark jar 파일 위치  (0) 2020.07.15
[AWS EMR] spark-defaults.conf 위치  (0) 2020.07.10
[AWS EMR] Zeppelin 재시작  (0) 2020.07.10
Comments