21.11.16

SMACK : Spark, Mesos, Akka, Cassandra 및 Kafka를 사용한 데이터 처리 플랫폼 아키텍처







• Spark - 분산, 대규모 데이터 처리를 위한 빠르고 일반적인 엔진

• 메소스(Mesos) - 분산 된 애플리케이션에서 효율적인 리소스 격리 및 공유를 제공하는 클러스터 리소스 관리 시스템

Akka - JVM에서 동시성, 분산 및 복원력이 뛰어난 메시지 기반 응용 프로그램을 작성하기 위한 툴킷 및 런타임 라이브러리

Cassandra - 여러 데이터 센터에서 대용량 데이터를 처리하도록 고 가용성 분산 데이터베이스

Kafka - 실시간 데이터 피드를 처리하도록 설계된 높은 처리량, 낮은 대기 시간의 분산 메시징 시스템

Storage layer: Cassandra


Cassandra는 고 가용성 및 높은 처리량 특성으로 잘 알려져 있으며 많은 write load를 처리하고 클러스터 노드 장애를 극복 할 수 있습니다. CAP 정리의 관점에서 Cassandra는 작업에 대해 조정 가능한 일관성 및 가용성을 제공합니다.

데이터 처리와 관련하여 더욱 흥미로운 점은 Cassandra가 선형적으로 확장 가능하다는 점(클러스터에 노드를 추가하기만하면 로드가 증가 할 수 있습니다)XDCR (Cross-Datacenter Replication) 기능을 제공합니다. 실제로 XDCR은 복제뿐만 아니라 다음과 같은 용도로 사용되는 흥미로운 사용 사례를 제공합니다.
-      지리적으로 분산 된 데이터 센터는 해당 지역 고유의 데이터를 처리하거나 고객 가까이에 위치합니다
-      데이터 센터를 통한 데이터 마이그레이션 : 실패 후 복구 또는 새 DC로 데이터 이동
-      운영과 분석 워크로드의 분리

그러나 이러한 모든 기능은 높은 비용이 요구되며, Cassandra와 함께 데이터 모델을 유지하는데 높은 비용이 요구됩니다. 이 모델은 파티션 키에 따라 클러스터 노드 전체에 분산되는 중첩 된 정렬 된 map으로 간주 될 수 있으며 항목은 클러스터링 컬럼별로 정렬 / 그룹화됩니다. 여기에 작은 예가 있습니다.

CREATE TABLE campaign(  
  id uuid,
  year int,
  month int,
  day int,
  views bigint,
  clicks bigint,
  PRIMARY KEY (id, year, month, day)
);
 
INSERT INTO campaign(id, year, month, day, views, clicks)  
VALUES(40b08953-a,2015, 9, 10, 1000, 42);
 
SELECT views, clicks FROM campaign  
WHERE id=40b08953-aand year=2015 and month>8;  
특정 범위의 특정 데이터를 얻으려면 전체 키를 지정해야하며 목록의 마지막 열을 제외하고는 range 절을 사용할 수 없습니다. 이 제한 조건은 디스크에 대한 임의 액세스를 생성하고 성능을 낮추는 여러 범위의 여러 스캔을 제한하기 위해 도입되었습니다. , 데이터 모델은 읽기 / 검색의 양을 제한하기 위해 읽기 쿼리에 대해 신중하게 설계되어야하기 때문에 새로운 쿼리를 지원할 때 유연성이 떨어집니다. 다음은 C * 테이블을 내부적으로 표현하는 몇 가지 예를 제공하는 C * 데이터 모델링 101 슬라이드입니다.

그러나 누군가 다른 테이블과 어떻게 든 조인해야 할 테이블이 있다면 어떻게 될까요? 다음 사례를 고려해 봅시다. 모든 캠페인에 대해 해당 월의 캠페인 당 총 조회수를 계산합니다.

CREATE TABLE event(  
  id uuid,
  ad_id uuid,
  campaign uuid,
  ts bigint,
  type text,
  PRIMARY KEY(id)
);
주어진 모델을 통해 이러한 목표를 달성 할 수있는 유일한 방법은 모든 캠페인을 읽고, 모든 이벤트를 읽고, 일치하는 캠페인 ID로 캠페인을 할당 한 다음 캠페인에 할당하는 것입니다. 그리고 카산드라에 저장된 데이터의 양이 엄청나고 메모리에 적합하지 않기 때문에 이러한 종류의 애플리케이션을 구현하는 것이 정말 어려워 보입니다. 따라서 이러한 종류의 데이터 처리는 분산 된 방법으로 수행되어야하며 Spark는이 사용 사례에 완벽하게 부합합니다.

Processing layer: Spark

주요 추상화 Spark RDD (Resilient Distributed Dataset, a distributed collection of elements)로 작동하며 workflow는 다음 네 가지 주요 단계로 구성됩니다.
-      RDD 연산 (변환 및 동작) DAG (Direct Acyclic Graph) 구성합니다.
-      DAG는 여러 단계의 작업으로 분할되어 클러스터 관리자에게 제출됩니다.
-      단계는 셔플 / 재분할이 필요하지 않은 작업을 결합합니다.
-      작업은 worker에서 실행되고 그 때 결과가 client로 전달됩니다.
SparkCassandra를 가지고 위의 문제를 해결하는 방법은 다음과 같습니다.

val sc = new SparkContext(conf)

case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, `type`: String)

sc.cassandraTable[Event]("keyspace", "event"
  .filter(e => e.`type` == "view" && checkMonth(e.ts))
  .map(e => (e.campaign, 1))
  .reduceByKey(_ + _)
  .collect()
Cassandra와의 상호 작용은 spark-cassandra-connector를 통해 수행되므로, 이것은 매우 쉽고 간단합니다. SQL 문을 일련의 RDD 작업으로 변환하는 SparkSQL NoSQL 저장소와 함께 작업 할 수 있는 흥미로운 옵션 중 하나입니다.

case class CampaignReport(id: String, views: Long, clicks: Long)
 
sql("""SELECT campaign.id as id, campaign.views as views,  
   campaign.clicks as clicks, event.type as type
        FROM campaign
        JOIN event ON campaign.id = event.campaign
    """).rdd
.groupBy(row => row.getAs[String]("id"))
.map{ case (id, rows) =>
   val views = rows.head.getAs[Long]("views")
   val clicks = rows.head.getAs[Long]("clicks")
 
   val res = rows.groupBy(row => row.getAs[String]("type")).mapValues(_.size)
   CampaignReport(id, views = views + res("view"), clicks = clicks + res("click"))
 
}.saveToCassandra(“keyspace”, “campaign_report”)
몇 줄의 코드를 사용하면 native Lamba 디자인을 구현할 수 있습니다. 물론 훨씬 더 정교 할 수 있지만, 이 예제는 얼마나 쉽게 구현할 수 있는지 보여줍니다.

Almost MapReduce: bringing processing closer to data

Spark-Cassandra connector는 데이터 장소를 인식하고 클러스터에서 가장 가까운 노드에서 데이터를 읽음으로써 네트워크 주변에서 전송되는 데이터의 양을 최소화합니다. Spark-C * connector 데이터 장소의 인식을 완전히 사용(이용)하기 위해 Spark worker Cassandra 노드와 함께 배치되어야합니다.












CassandraSpark 연계와 함께, operational (또는 write-heavy) 클러스터와 analytic 클러스터를 분리하는 것이 좋습니다.
-      클러스터를 독립적으로 확장 할 수 있습니다.
-      데이터가 Cassandra에 의해 복제되었으므로 추가 작업이 필요하지 않습니다.
-      Analytic 클러스터에는 read/write load pattern이 다릅니다.
-      Analytic 클러스터에는 추가적인 데이터 ( : dictionary) 및 처리 결과가 포함될 수 있습니다.
-      Spark resource 영향은 하나의 클러스터로 제한됩니다.

Spark 애플리케이션 deplyment 옵션을 한 번 더 살펴 보겠습니다.













Cluster Resource Mananger는 다음과 같은 3가지 주요 옵션을 사용할 수 있습니다.
-      Spark Standalone - Spark masterworker는 독립 실행 형 응용 프로그램으로 설치되고 실행됩니다 (작업 부하가 일부 발생하고 worker 단위의 정적인 자원 할당만 지원함)
-      Hadoop 생태계를 이미 보유하고 있다면 YARN이 정말 좋습니다.
-      시작부터 Mesos Hadoop 애플리케이션을 실행하는 것뿐만 아니라 이기종 워크로드를 처리하기 위해 클러스터 리소스의 동적 할당을 위해 설계되었습니다.

Mesos architecture


Mesos Cluster는 리소스 제공 및 스케줄링을 담당하는 master node와 작업 실행을 실제로 수행하는 slave node로 구성됩니다. 여러 master node가 있는 HA 모드에서 ZooKeeperreader node 선택 및 서비스 검색에 사용됩니다. Mesos에서 실행되는 응용 프로그램은 Framework라고 하며 API를 사용하여 자원 제공을 처리하고 Mesos에 작업을 제출합니다. 일반적으로 작업 실행 프로세스는 다음 단계로 구성됩니다.
-      Slave node는 사용 가능한 리소스를 Master node에 게시합니다.
-      Master node Framework에 자원 제공을 전달합니다.
-      스케줄러는 작업 당 필요한 작업 및 리소스로 응답합니다.
-      Master node TaskSlave node에게 보냅니다.

Bringing Spark, Mesos and Cassandra together

앞서 말했듯이 Spark worker은 데이터 장소의 인식을 강화하기 위해 Cassandra 노드와 함께 배치되어야하므로 네트워크 트래픽 및 Cassandra 클러스터 로드가 감소합니다. 메소스(Mesos)로 이를 달성하는 방법에 대한 가능한 deployment 시나리오가 있습니다.

-      Mesos Master node ZooKeeper가 함께 배치되었습니다.
-      Mesos Slave node Cassandra node는 함께 배치되어 Spark의 데이터 장소를 더 잘 인식하게 합니다.
-      모든 worker node에 배포 된 spark binary spark-env.sh는 적절한 master end porint executor jar 위치로 구성됩니다.
-      Spark Executor JAR S3 / HDFS에 업로드되었습니다.
제공된 설정을 사용하여 Spark binary가 설치된 실제 job logic이 포함된 assembly jar 가진 worker node로부터 간단한 spark-submit 호출로 Spark job을 클러스터에 제출할 수 있습니다.
spark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar  
Dockerized Spark를 실행하는 옵션이 있었서 모든 단일 클러스터 노드에 binary를 배포 할 필요가 없습니다.

Scheduled and Long-running tasks execution


모든 데이터 처리 시스템은 조만간 두 가지 유형의 작업을 수행해야 할 필요성에 직면하게됩니다. 정기적인 일괄 처리 집계와 같은 예정된/정기적인 작업과 스트리밍 처리의 경우인 장기 실행 작업입니다. 두 가지 유형의 주된 요구 사항은 fault-tolerant입니다. 클러스터 노드가 실패한 경우에도 작업은 계속 실행되어야 합니다. Mesos 생태계에는 이러한 유형의 작업을 지원하는 두 가지 큰 프레임 워크가 있습니다.

Marathon ZooKeeper HA 모드를 지원하고 Docker를 실행할 수 있고 훌륭한 REST API를 가질 수 있는 long running taskfault-tolerant 실행을 위한 프레임 워크입니다. 다음은 셸 명령으로 spark-submit을 실행하는 간단한 작업 구성의 예입니다.


Chronos Marathon과 동일한 특성을 가졌지만 예정된 작업을 실행하도록 설계되었으며 일반적으로 Job Graph를 지원하는 HA cron으로 배포됩니다. 다음은 단순한 bash 스크립트로 구현 된 S3 압축 작업 구성의 예입니다.


Mesos 리소스 관리 기능과 함께 널리 사용되는 시스템을 통합하기 위해 이미 개발되었거나 현재 개발중인 많은 프레임 워크가 있습니다. 그 중 일부의 이름을 지정하기만하면 됩니다.
-      Hadoop
-      Cassandra
-      Kafka
-      Myriad: YARN on Mesos
-      Storm
-      Samza

Ingesting the data

지금까지는 그렇게 좋았습니다. 저장소 계층이 설계되고 자원 관리가 설정되며 작업이 구성됩니다. 아직 존재하지 않는 유일한 방법은 처리 할 데이터입니다.


들어오는 데이터가 높은 속도로 도착한다고 가정하면 끝점은 다음 요구 사항을 충족해야 합니다.

-      높은 처리량 / 낮은 대기 시간의 제공
-      탁력성
-      확장의 용이성을 허용함
-      Back pressure 지원

Back pressure는 필수는 아니지만 load spike 를 처리하는 옵션으로 사용하는 것이 좋습니다.

Akka는 요구 사항에 완벽하게 부합하며 기본적으로이 기능 세트를 제공하도록 설계되었습니다. Akka는 무엇입니까?

-      JVM actor model 구현
-      메시지 기반 및 비동기
-      공유 가능한 상태를 적용하지 않음
-      하나의 프로세스에서 머신 클러스터까지 쉽게 확장 가능
-      Actorparental supervision에서 계층 구조 형성
-      동시성 프레임 워크 : akka-http, akka-streams, akka-persistence
다음은 JSON HttpRequest를 처리하고 도메인 모델 사례 클래스로 구문 분석 한 다음 Cassandra에 저장하는 3가지 actor model의 간단한 예입니다.
class HttpActor extends Actor {  
  def receive = {
    case req: HttpRequest => 
      system.actorOf(Props[JsonParserActor]) ! req.body
    case e: Event =>
      system.actorOf(Props[CassandraWriterActor]) ! e
  }
}
 
class JsonParserActor extends Actor {  
  def receive = {
    case s: String => Try(Json.parse(s).as[Event]) match {
      case Failure(ex) => //error handling code
      case Success(event) => sender ! event
    }
  }
}
 
class CassandraWriterActor extends Actor with ActorLogging {  
  //for demo purposes, session initialized here
  val session = Cluster.builder()
    .addContactPoint("cassandra.host")
    .build()
    .connect()
 
  override def receive: Receive = {
    case event: Event =>
      val statement = new SimpleStatement(event.createQuery)
        .setConsistencyLevel(ConsistencyLevel.QUORUM)
 
      Try(session.execute(statement)) match {
        case Failure(ex) => //error handling code
        case Success => sender ! WriteSuccessfull
      }
  }
}
모든 코드가 작동하려면 몇 줄의 코드 만 필요한 것처럼 보입니다.하지만 Akka를 사용하여 Cassandra에 원시 데이터 (이벤트)를 작성하는 것은 쉬운 일입니다.

-      Cassandra는 일괄 처리가 아닌 빠른 서비스를 위해 설계되었으므로 들어오는 데이터의 사전 집계가 필요합니다.
-      집계 / 롤업의 계산 시간은 데이터의 양에 따라 증가 할 것입니다.
-      Actorstatelesss 디자인 모델로 인해 집계를 수행하는 데 적합하지 않습니다.
-      마이크로 배치가 부분적으로 문제를 해결할 수 있음
-      원시 데이터를 위한 일종의 신뢰할 수 있는 버퍼가 여전히 필요합니다.

Kafka as a buffer for incoming data


들어오는 데이터를 약간의 보존 및 추가 사전 집계 / 처리로 유지하려면 일종의 분산 커밋 로그를 사용할 수 있습니다. 이 경우 소비자는 데이터를 일괄 적으로 읽고 처리하고 사전 집계 형태로 Cassandra에 저장합니다. 다음은 akka-http를 사용하여 HTTP를 통해 Kafka JSON 데이터를 게시하는 예입니다.
val config = new ProducerConfig(KafkaConfig())  
lazy val producer = new KafkaProducer[A, A](config)  
val topic = “raw_events”
 
val routes: Route = {  
  post{
    decodeRequest{
      entity(as[String]){ str =>
        JsonParser.parse(str).validate[Event] match {
          case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str))
          case e: JsError => BadRequest -> JsError.toFlatJson(e).toString()
        }
      }
    }
  }    
}
 
object AkkaHttpMicroservice extends App with Service {  
  Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))
}

Consuming the data: Spark Streaming

Akka는 여전히 Kafka의 스트림 데이터를 사용하는 데 사용할 수 있지만 EcoSystem에서 Spark를 사용하면 Spark Streaming을 문제 해결 옵션으로 사용할 수 있습니다.

-      다양한 데이터 소스를 지원합니다.
-      적어도 한 번 의미를 제공합니다.
-      Kafka Directidempotent 스토리지에서 정확히 한번 사용할 수 있는 의미론

Spark Streaming 예제를 사용하여 Kinesis의 이벤트 스트림 사용 :
val ssc = new StreamingContext(conf, Seconds(10))
 
val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName,  
   endpointURL,regionName, InitialPositionInStream.LATEST,     
   Duration(checkpointInterval), StorageLevel.MEMORY_ONLY)
}
 
//transforming given stream to Event and saving to C*
kinesisStream.map(JsonUtils.byteArrayToEvent)  
.saveToCassandra(keyspace, table)
 
ssc.start()  
ssc.awaitTermination()  

Designing for Failure: Backups and Patching

일반적으로 이것은 시스템의 가장 지루한 부분이지만 시스템에 들어온 데이터가 유효하지 않거나 모든 분석 데이터 센터가 손상 될 가능성이있을 때는 매우 중요합니다.


그렇다면 Kafka / Kinesis에 데이터를 저장하는 것이 어떻습니까? 글을 쓰고있는 순간에 키네 시스는 단 하루 만 머무르고 실패로 인해 백업을하지 않으면 모든 처리 결과가 손실 될 수 있습니다. Kafka는 더 많은 보존 기간을 지원하지만 S3 스토리지는 Kafka를 실행하는 여러 인스턴스보다 훨씬 저렴하고 S3 SLA가 실제로 좋기 때문에 하드웨어 소유 비용을 고려해야합니다.

백업을하는 것 말고는 복원 / 패치 전략을 미리 설계하고 테스트해야 데이터의 문제를 신속하게 해결할 수 있습니다. 프로그래머가 집계 작업이나 데이터 중복에 실수를하면 계산 결과의 정확도가 떨어질 수 있으므로 오류를 수정하는 것이 큰 문제는 아닙니다. 이 모든 작업을 더 쉽게 수행하는 한 가지 방법은 데이터 모델에서 무형을 시행하여 동일한 작업을 여러 번 반복하여 동일한 결과를 생성하는 것입니다 ( : sql update는 멱등 원 (itempotent) 작업이고 카운터 증가는 그렇지 않습니다).

다음은 S3 백업을 읽고이를 Cassandra에 로드하는 Spark 작업의 예입니다.
val sc = new SparkContext(conf)
 
sc.textFile(s"s3n://bucket/2015/*/*.gz")  
  .map(s => Try(JsonUtils.stringToEvent(s)))
  .filter(_.isSuccess).map(_.get)
  .saveToCassandra(config.keyspace, config.table)

The Big picture

SMACK으로 구축 된 데이터 플랫폼의 기본 설계
SMACK 스택은 다음과 같습니다.

-      다양한 데이터 처리 시나리오를 위한 간결한 도구 상자
-      대규모 커뮤니티에서 집중적인 검증을 거친 널리 사용되는 소프트웨어
-      낮은 대기 시간을 유지하면서 데이터의 쉬운 확장성 및 복제
-      이기종 로드에 대한 통합 클러스터 관리
-      모든 종류의 애플리케이션을 위한 단일 플랫폼
-      다양한 아키텍처 설계 (배치, 스트리밍, Lambda, Kappa)를 위한 구현 플랫폼
-      진정한 시장 출시 시간 단축 ( : MVP 검증)


No comments:

Post a Comment