Tham số dòng lệnh cho Apache Spark với scopt


Các bài viết về Apache Spark

Phần 1: Hướng Dẫn Cài Apache Spark Trên Ubuntu
Phần 2: Một chương trình Apache Spark siêu đơn giản
Phần 3: Tạo Fat Jar cho chương trình Apache Spark
Phần 4: Tham số dòng lệnh cho Apache Spark với scopt


Chúng ta sẽ dùng scopt để truyền tham số dòng lệnh cho chương trình Apache Spark.

Mã nguồn

Gắn thư viện scopt vào build.sbt

Thêm dòng này vào build.sbt

// https://mvnrepository.com/artifact/com.github.scopt/scopt
libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0"

Sử dụng trong Scala

Ta định nghĩa một case class, bao gồm các tham số cần thiết, và giá trị default. Phần này để ngoài hàm main.

case class Config(
    logfile: String = "this/is/default/path.txt",
    )

Tất cả code dưới đây để vào hàm main.

Ta tạo một parser chứa thông tin các tham số. Ví dụ dưới đây chứa tham số logfile dạng string.

val parser = new scopt.OptionParser[Config]("scopt") {
      head("scopt", "4.x")
      opt[String]('f', "logfile")
        .action((x, c) => c.copy(logfile = x))
        .text("input logFile")
    }

Sau đó, ta sẽ đọc tham số này, và sử dụng trong mạch chương trình.

    // parser.parse returns Option[C]
    parser.parse(args, Config()) match {
      case Some(config) =>
        // do stuff
        val logFile = config.logfile
        println(s"logfile $logFile")
        val logData = spark.read.textFile(logFile).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println(s"log file dir is $logFile")
        println(s"Lines with a: $numAs, Lines with b: $numBs")

      case None =>
        // arguments are bad, error message will have been displayed
        println("nope nope nope")

    }

Bạn thấy có đoạn rẽ nhánh, để ta biết làm gì khi phân tách tham số lỗi (case None). Trong trường hợp thành công, object config sẽ thuộc lớp Config ta đã định nghĩa trên, với thành phần logfile thuộc kiểu string, mang giá trị mặc định hoặc giá trị ta đã truyền vào bằng tham số dòng lệnh.

Code hoàn chỉnh chạy được như sau

import org.apache.spark.sql.SparkSession
import scopt.OptionParser

case class Config(
    logfile: String = "this/is/default/path.txt",
    )

object ArgsSimpleApp {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    // define parser
    val parser = new scopt.OptionParser[Config]("scopt") {
      head("scopt", "4.x")
      opt[String]('f', "logfile")
        .action((x, c) => c.copy(logfile = x))
        .text("input logFile")
    }

    // parser.parse returns Option[C]
    parser.parse(args, Config()) match {
      case Some(config) =>
        // do stuff
        val logFile = config.logfile
        println(s"logfile $logFile")
        val logData = spark.read.textFile(logFile).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println(s"log file dir is $logFile")
        println(s"Lines with a: $numAs, Lines with b: $numBs")
        
      case None =>
        // arguments are bad, error message will have been displayed
        println("nope nope nope")

    }
    spark.stop()
  }
}

Chạy chương trình

Bạn phải dùng plugin assembly để tạo Farjar.

sbt assembly
$SPARK_HOME/bin/spark-submit \
 --class "ArgsSimpleApp" \
 --master local[4]  \
 target/scala-2.12/Args-Simple-Project-assembly-1.0.jar \
 --logfile $SPARK_HOME/README.md

Ví dụ hoàn chỉnh

https://github.com/ttpro1995/ApacheSparkArgsSimpleApp

Leave a Reply