Home Pluto: High-Performance IoT-Aware Stream Processing
Post
Cancel

Pluto: High-Performance IoT-Aware Stream Processing

2022 / 1 / 4 iMES 세미나

Abstract

  • 다양한 IoT app에서 많고 작은 IoT stream query 생성 -> cloud backend server에서 실행되어짐
  • 기존 분산 stream 처리 system들은 많은 수의 IoT stream query를 효율적으로 처리하지 못함
    • 비효율적인 query/code 제출 계층, query 실행 계층이 원인
  • Pluto 제안 ( 새로운 IoT aware stream 처리 시스템 )
    • node 단에서 많은 수의 IoT stream query 실행을 최적화하는 것에 초점
    • IoT query 특성을 활용하여 3단계 execution으로 end-to-end query 최적화
      • code registration을 new API와 분리함으로써 IoT query submission의 bottleneck 최소화
        • code 중복 등록 없앰, query 전반에 code 공유 가능
      • execution 단계에서 IoT stream query와 API에 노출된 정보의 공통점을 토대로 system resource를 최대한 공유
        • machine에서의 bottleneck 최소화
    • 다른 stream processing system과 비교하여 throughput을 크게 향상
      • P99 latency는 1초 미만으로 유지함

Introduction

  • IoT device / sensor : 다양한 IoT data stream을 생성 ( end user를 대신하여 stream query 생성 )
    • IoT stream query : 특정 user와 관련된 소량의 data stream 처리
  • IoT stream query : edge device에서 실행되어짐 -> 현재는 cloud backend server로 부담을 이전함
    • IoT device hardware의 운영 비용과 투자를 줄일 수 있음
    • code를 server로 배포함으로써 software code를 빠르게 배포할 수 있음
    • 여러 data stream을 쉽게 연결 가능
    • cloud server에서 원격 IoT device를 제어할 수 있음
  • Pluto : IoT stream query를 backend server를 사용하여 처리하는 시나리오로 구성
    • IoT stream query가 너무 많이 증가하고 있음 ( 수십억개의 IoT device, 다양한 app에서 생성되어지는 IoT stream query )
    • 가능한 1개의 machine에서 최대한 많은 IoT stream query 핸들링하는 방법이 있을까?
  • 기존 분산 stream 처리 system : 많은 IoT stream query 처리에 부적합
    • 원인 : 비효율적인 query submission, 비효율적인 execution layer으로 인한 bottleneck이 자주 일어나기 때문
      • query가 제출될 때마다 code file도 같이 제출되어 bottleneck 유발
      • 각 query에 대해 system resource가 별도로 할당되어 overhead 유발
        • 각 query마다 별도의 network 연결
        • thread 생성
        • code에 대해 추가 메모리 할당 등
  • Pluto 제시 : 다수의 IoT stream query를 최적화 실행하는 새로운 stream 처리 system
    • IoT 인식 3단계 query 실행 ( code 등록 / query 제출 / execution )
      • code 등록 단계와 query 제출 단계를 분리 -> query 제출 전에 code를 등록할 수 있음
        • 등록된 code 정보를 유지 / 중복된 code를 등록할 때 필요한 overhead 제거
      • execution 단계에서는 아예 새로운 IoT 인식 query 실행 모델을 만듬
    • bottleneck을 최소화하기 위해 system resource를 서로 공유 ( network 연결, thread )
    • IoT query의 event를 처리하는 locality-aware 스케줄링 사용
      • 지역적 인식 처리를 하기 위해 Q-group 도입
        • Q-group ( IoT query group ) : 새로운 IoT stream query 스케줄링 단위
          • 동일한 app code를 참조하는 여러 IoT stream query의 event queue를 포함
    • 각 Q-group을 thread에 할당하는 load를 동적으로 균형맞춤
  • 다양한 IoT app을 가진 24-core machine에서의 평가 -> 99분위 latency를 1초 이내로 유지하면서 IoT query의 수행 개수 향상
    • 기여 목록
      • IoT workload에 대한 통찰력 + end-to-end system 최적화를 위한 workload 특성 활용 방법 제공
      • 3단계의 end-to-end query 실행 계층 구성 ( qurey 제출, code 제출 layer를 분리 )
      • 새로운 IoT aware stream system 설계, 구현 ( 새로운 API, 새로운 model )
        • locality-aware 처리를 위해 Q-group 활용 ( 새로운 query 스케줄링 단위 )
      • 성능에 대한 상세분석 제공
        • 기존의 기술들과 비교하여 성능 이득을 가져오는 기술 식별

Background and Motivation

A. IoT Stream Query

Environment
  • IoT device들은 IoT message broker에게 연결되어있음 ( data stream을 publish )
  • user들은 app 등의 interface 등을 이용하여 IoT stream query 생성
    • query는 user-specific parameter를 포함
  • backend server에 query가 submit -> app code나 parameter 등에 따라 message broker가 data stream을 subscribe, process
  • 논문 목적 : “message broker 부분에서 IoT data stream을 어떻게 관리할 것인가?”
    • 어떻게 효율적으로 많은 수의 query를 처리할 수 있을까?
Query model
  • 방향 비순환 그래프로 나타낼 수 있음 ( Directed Acyclic Graph )
    • vertex 는 여러가지로 나타날 수 있음
      • source : IoT data stream에 subscribe
      • operator : data를 process
      • sink : 처리된 data를 publish
    • edge : 흐르는 data stream의 data flow 방향
Example
  • Map : raw한 data를 data format에 맞게 전처리
  • Window : 최근 data를 sliding window로 모아둠
  • Action : processing logic에 맞게 다음 action을 결정
  • sink : message broker로 action을 emit
    • IoT device가 이것을 받고 행동

B. Workload Characteristics

Small stream size
  • IoT stream query : 처리하는 data stream 양이 작음
  • IoT device : metrics를 sampling, 주기적으로 event 생성 -> event rate가 낮음
    • data를 병렬화할 이유는 없음
    • 단일 node에서 많은 IoT stream query를 실행하는 것이 유리 ( 필요 node 수, server 유지비용 감소 )
  • data stream, stream database : query와 다르게 크기가 큼 ( 1M events )
    • 각 stream query를 최적화하기 위해 별도 resource ( process, thread, network 연결 ) 를 할당받음
Various application codes
  • 다양한 app code가 cloud backend server에 register될 수 있음
Large number of IoT queries
  • IoT app이 많아질수록 stream query 또한 많아질 것
    • 현재 대략 200만개 이상 정도 될 듯
  • stream query를 하나의 node에서 가능한 많이 처리하는 것이 중요
    • backend server의 유지비용을 최소화하기 위함
Commonalities among IoT queries
  • IoT stream query가 backend server에서 실행될 때 활용할 수 있는 공통점이 있음
    • 많은 IoT stream query가 같은 app에서 생성되며, 같은 app code를 참조함
    • query가 서로 다른 IoT device에서 다른 stream을 처리하더라도, 공통 message broker로부터 data stream을 수신받음

Pluto Design Overview

  • Pluto : 다수의 IoT stream query 실행을 최적화하는 stream 처리 system
  • 기존 system의 한계를 해결하기 위한 주요 설계 원칙
    • code와 query 제출란의 분할 : 새로운 API로 3단계 수행
      • code / network 연결 공유, 경량 query 제출 등을 통해 각 단계를 최적화 가능
    • 낮은 latency / 높은 throughput의 IoT 인식 query 실행
      • 한 node에서 가능한 한 많은 system resource 공유 -> 많은 수의 IoT query 처리
      • 여러 core에 걸쳐 query의 load를 rebalancing
        • IoT query의 공통점을 이용하여 새로운 스케줄링 단위인 Q-group 사용
        • 같은 app code를 공유하는 query들을 Q-group에 할당
        • locality-aware한 스케줄링을 위해 Q-group 예약 / 동적으로 나눔
  • 3가지 구성요소로 이루어짐
    • client
    • code manager ( CM )
    • worker
  • 진행 순서
    • 개발자가 app을 구현 후 code를 등록
      • 다양한 user defined function을 지원하기 위한 다양한 API를 제공해야함
      • code 등록 API를 이용하여 UDF code를 등록
    • client -> CM으로 code 전달
    • CM : 등록된 code 관리
      • code들은 worker에 저장될 것
      • 중복으로 제출되지 않은 code : 저장 후 unique한 code identifier를 리턴
      • 중복으로 제출된 code : 저장하지 않고 기존에 존재하던 code identifier를 리턴
      • 중복 code 확인 방법 : code byte의 hash값
    • code 등록 이후 : user가 code identifier와 함께 IoT query 제출
      • identifier : code를 group화 / 공유하는데 중요한 역할
  • 제출된 query를 DAG ( 유향 비순환 그래프 ) 로 바꾸는 API를 제공
    • 각 query에 대해 아래 값들을 수신하여 query logic를 customizing
      • query source의 message broker 주소
      • query가 참조하는 app code identifier
      • user 정의 parameter 값
  • query 제출은 경량화됨 ( query 제출과 code 제출이 분할되어있기 때문 )
    • query를 제출할 때, code identifier + parameter만을 제출 ( code 중복 제출 방지 )
  • worker : 단일 node에서 실행되는 장기 실행 process
    • 제출된 IoT query에서 등록된 code를 이용하여 UDF를 자동 / 신속히 instant화
    • message broker와의 연결
    • system resource 공유 / locality-aware 스케줄링 시행
      • broker 주소 및 code identifier 인식하여 수행

Decoupling of Code and Query Submission

A. Code Registration

  • Pluto : UDF 작성 / 매개변수에 주석을 달기 위한 API 제공
    • 3가지 API 제공
      • 매개 변수 선언
      • 매개 변수 binding한 UDF
      • code 등록
    • Java로 작성한 API 예시를 아래에 보여줄 것
Parameter declaration
1
2
@ParameterDeclare
public class Threshold implements ParameterDeclare<Double> {}
  • parameter를 사용할 것에 대한 정의를 해야함
    • ParameterDeclare라는 annotation을 사용
    • Threshold라는 class를 정의
  • 위의 예시는 Double형의 Threshold라는 parameter를 사용할 것임을 정의함
    • parameter의 실제 값은 query 제출 phase에서 정해질 것
UDFs with parameter binding
1
2
3
4
5
6
7
8
9
10
11
class MyFilter implements PlutoFilter<Double> {
    private double threshold;
    @Inject
    MyFilter(@ParameterBinding(Threshold.class) double threshold) {
        this.threshold = threshold;
    }
    @Override
    public boolean filter(double temperature) {
        return temperature < threshold;
    }
}
  • 개발자는 UDF를 정의하고 정의된 parameter를 bind해주어야함
  • 위 예시는 user가 입력한 threshold값을 현재 온도와 비교하여 값을 리턴해주는 예시임
  • UDF에 parameter를 bind하기 위해서는 @ParameterBinding 이라는 annotation을 사용해야함
    • 정의된 parameter의 class name을 덮고 있어야함
  • @Inject 라는 annotation도 활용되어야함
    • 실제 parameter값이 들어왔을 때 Pluto가 자동으로 inject하여 값 넣고 instant화
    • 이 과정은 query가 실제 parameter 값과 같이 제출되었을 때마다 실행됨
Code registration
  • UDF가 작성되었다면 code를 compile / submit ( command-line API 활용 )
  • 이 과정은 제출된 jar file에 대한 unique code identifier를 리턴함

B. Query Submission API

  • code가 등록되면 code identifier를 end user device에 배포
  • IoT query : code identifier + query DAG를 함께 제출
    • UDF와 user-specific parameter value 설정
1
2
3
4
5
6
7
8
parameters = PlutoClient.parameterBuilder()
    // value defined by an end-user
    .setParameter(Threshold.class, value)
    .build();
PlutoQueryBuilder.mqtt(brokerAddress)
    .filter(MyFilter.class)
    ... // dataflow API
    .submit(codeIdentifier, parameters)
  • query는 Pluto의 dataflow API에 의해 작성됨
    • Line 1~3 : parameter 값을 지정함
    • Line 5~8 : query DAG를 만듬
  • Pluto : 이미 등록된 code에 대한 metadata만을 정하기 때문에, query 제출은 경량화되어있음

IoT-Aware Execution Model

  • 목적 : resource bottleneck 최소화 / system 성능 향상
    • 방법 : IoT stream query들의 공통점을 활용
  • 이전 query 처리 system과의 비교
    • 기존 시스템 : query마다 system resource를 별도 할당
    • Pluto : 네트워크 연결, thread들 ( thread pool ), 같은 code에 대한 memory 지역을 query마다 공유함
      • 각 thread pool에는 고정된 수의 thread가 있고, core에 균등하게 고정되어 있음
      • I/O thread ( source, sink ) 와 processing thread ( operator ) 를 서로 분리해둠 -> context switching 횟수 줄임, CPU cache miss 줄임
      • query마다 network 연결을 공유하기 위해서는 query의 broker 주소에 열린 network 연결이 있는지 확인
        • 연결이 있다면 그 network 연결에서 source event 검색
        • 연결이 없다면 새로 연결을 만들고, 이것을 I/O thread에 할당
  • locality-aware 스케줄링 채택 : CPU bottleneck 최소화 목적
    • 예시 : 동일한 code memory 영역을 사용할 query들의 event를 하나의 core에서 연속적으로 처리하도록 스케줄링
    • code 참조의 지역성 활용 / CPU cache miss 최소화
  • 새로운 스케줄링 unit으로 Q-group 사용
    • locality-aware 처리를 위해 stream query를 묶음

A. Q-Group Creation and Query Grouping

  • 동일한 app code를 참조하는 IoT query를 하나의 그룹 ( Q-Group ) 으로 묶음
    • G = {ref(C), Q}
      • G = Q-group
      • ref(C) = code C를 참조하는 reference
      • Q = 실행될 때 C를 참조하는 query set
    • Q-group은 code가 등록될때마다 만들어짐
    • query가 worker에 전달될 때마다 codeIdentifier를 확인하고 적절한 Q-group에 할당

B. Q-Group Assignment

  • 다양한 app code가 등록되어 많은 수의 Q-group이 만들어질 수 있음
    • process thread보다 Q-group의 개수가 많아진다면 다수의 Q-group이 하나의 thread에 할당될 수 있음
  • Pluto : 다양한 least-load-based balancing 매커니즘 사용 -> Q-group의 load를 여러 processing thread에 적절히 분배
    • load가 제일 적은 k개의 processing thread를 선택하여 그 중 하나에 할당
      • 이런 방식이라면 짧은 시간에 많은 수의 Q-group이 할당되는 것을 방지할 수 있음
      • k의 기본값은 2
    • IoT stream의 event 도착이 대체로 푸아송 분포에 수렴한다고 함
      • 이를 이용하여 processing thread의 load를 계산할 수 있음
        • Q-group에 할당된 query들의 load를 모두 합하여 전체 load 추정
        • 각 query의 load는 평균 event 도착/ 평균 event 처리율 로 계산

C. Q-group Scheduling and Processing

  • process thread : processing latency를 최소화하면서 locality를 활용하기 위해 Q-group 내의 event를 처리해야함
    • Pluto : event driven 구조 채용 ( event를 받았을 때에만 processing thread 실행 )
  • 각 processing thread P는 2레벨의 queue 구조를 채용
    • 1-level queue ( active group queue ) : 활성화된 Q-group을 포함
      • Ga={G1a,G2a,,Gka} (kZ>0)
      • Gka : P에서 할당된 Q-group 중 하나 ( 최소 하나의 event를 hold )
        • 각각 2-level queue ( event queue ) 를 가지고 있음
          • event queue : 할당된 query들의 event들을 포함
    • 활성화된 Q-group이 없다면 processing thread는 sleep
      • 활성화된 Q-group이 나타날 때까지 유지
      • 이 접근법은 CPU cycle 낭비 없이 낮은 latency로 event를 처리할 수 있음
        • CPU cycle은 event가 없는 Q-group을 찾아내기 위해 낭비될 것
// Source I/O thread
Function EventScheduling(e, q)
    // e : an event of query q;
    G <- queryGroupTable(q);
    add e to the event queue of G;
    if G becomes active then
        P <- groupProcessingThreadTable(G);
        add G to the active group queue of P;
        awake(P);

// Processing thread P
Function EventProcessing(P)
    Qp <- activeGroupQueue of P;
    while Qp is not empty do
        startTime <- currentTime();
        Gᵃ <- poll an active group from Qp;
        while until event queue of Gᵃ is empty do
            if elapsedTime(startTime) < timeout then
                (e, q) <- poll an evnet from the event queue;
                processEvent(e, q);
            else
                // preemption and rescheduling
                add Gᵃ to Qp;
                break;
        if Qp is Empty then
            wait();
  • EventScheduling 함수
    • I/O thread : network 연결에서 query event 검색, Q-group event queue에 event 추가
      • Q-group을 각 processing thread의 active Q-group queue에 스케줄링
      • 이후 processing thread에 event를 가진 Q-group이 있다고 알림
    • source I/O thread가 network 연결을 통해 query로부터 event를 받는 경우 이 함수가 실행됨
      • thread : query가 할당된 Q-group을 찾음
      • Q-group이 활성화되었다면 I/O thread가 processing thread의 active Q-group queue로 Q-group을 추가
      • 이후 processing thread에게 활성화된 Q-group이 추가됨을 알림
  • EventProcessing 함수
    • worker가 시작될 때 각 processing thread마다 1번씩만 실행됨
    • processing thread는 active Q-group이 queue에 없다면 sleep / 있다면 wake
    • 같은 Q-group에서 event를 계속해서 처리한다면, code의 locality를 활용할 수 있음
      • 같은 code를 계속해서 사용하므로 CPU의 cache miss를 줄일 수 있음
    • 하나의 Q-group이 하나의 processing thread를 오랫동안 점유할 경우, timeout을 추가
      • timeout이 지난다면 Q-group을 active Q-group queue의 맨 뒤에 위치시킴
      • 모든 Q-group이 processing thread를 공정하게 사용할 수 있도록 조치
    • Q-group의 load가 너무 많을 경우 그것을 적절히 split하여 load를 다른 thread로 분할할 것

D. Load Rebalancing: Q-group Split and Mergeing

  • Q-group : 시간에 따라 변화할 수 있음 // thread : event 처리 중에 overload될 수 있음
    • Pluto : Q-group을 동적으로 split하거나 merge하도록 조치 ( load rebalancing )
  • 나뉘어진 sub Q-group은 부모와 동일한 code reference를 가지고, query와 event는 일부만을 가짐
  • Pluto : 주기적으로 각 thread들의 load를 확인하여 overload / underload를 감지
    • overload된 경우 split한 일부 혹은 Q-group 전체를 underload로 옮김
    • overload / underload를 감지하기 위한 parameter로 α, β ( α < β ), ρT 를 사용함
      • 판별 방법 : ρT>β인 경우 overload / ρT<α인 경우 underload
      • 최적의 α=0.8, β=0.95
  • 아래 과정을 반복해가며 overload된 thread가 없도록 조정해나감
    • overload된 thread To를 찾고, 그 중에서 가장 큰 load를 가진 Q-group Gl을 찾음
      • 이것을 underload thread Tu로 옮길 예정
    • 만약 Gl을 옮긴 이후에 Tu의 load가 적정수준인지 먼저 확인 ( ρTu+ρGl<β )
      • 적정수준이라면 group을 통째로 옮김
      • 만약 아니라면 Gl을 split하여 적정 수준으로 조절
        • group을 split한다면, event 또한 split되어지고, 각각 다른 group에 assign될 것
      • 나누었는데도 load를 조절하지 못한다면 재귀적으로 분할
    • underload된 thread의 경우에는 반대로 group을 merge하여 조정 ( locality-aware는 지킬 것 )

Implementation

  • Pluto : 17000줄 정도의 Java 1.8 code로 구현
    • API 중 parameter declare / binding 부분은 Tang을 사용 ( dependency inejction framework )
    • message broker와의 소통을 위해 EMQ 사용 ( broker의 연결 수천개를 지원 )
    • worker에서, 다수의 I/O thread와 하나의 processing thread는 동시에 active Q-group queue에 접근할 수 있음
      • thread 경합을 최소화하기 위해 lock-free atomic 카운터 사용
        • active Q-group queue로의 access 수, thread의 경합 overhead를 최소화함
      • 각 Q-group은 event 수를 나타내는 카운터를 가지고 있음
        • I/O thread는 이 수를 증가시키고, 동시에 processing thread는 이를 감소시킬 것
        • 카운터가 0 -> 1이 되었다면 Q-group을 활성화하게 될 것

Evaluation

  • 아래에 대한 답을 가져왔음
    • Pluto가 기존 분산 stream 처리 혹은 stream database system과 다르게 어떻게 성능을 향상시켰는가?
    • 어떤 요인이 Pluto의 성능향상에 기여하였는가?
    • Pluto가 core들 간에 낮은 latency를 유지하면서 편향된 workload의 균형을 잘 맞추는가?
    • Pluto가 큰 size의 stream에도 효율적인가?
Environment
  • 하나의 machine에서 동작해야하므로 24-core NUMA를 사용
    • 2x Intel Xeon E4-2680 2.5GHz
    • 30M Cache
    • 8x 16GB RDIMM
    • Ubuntu 14.04.5
    • Kernel 4.4.0-124-genric
  • I/O thread와 processing thread는 48개 지정 ( 2 x core 개수 )
  • MQTT 사용 ( data stream과 query result를 전송하는 역할 )
  • EMQ 사용 ( message broker server )
  • 2개의 추가 machine 사용 ( event stream 생성 / message broker )

A. Methodology

  • 많은 수의 data stream, app, IoT stream query를 real-world dataset으로 시뮬레이트

1. Workload

  • 실제 세상에서 필요할 data들을 추출하여 총 9가지 유형의 data 사용 ( stypei )
    • 온도 / 습도
    • 심박수
    • user GPS
    • taxi GPS
    • 가축 GPS
    • 토양 수분
    • 가정용 전력 소비
  • 각 query에 대해 stypei 중 하나를 고르게 선택
    • 새로운 data stream 생성 ( 푸아송 process model에 따라 생성 )
    • 입력 속도 : 평균적으로 초당 1개의 event
  • app 개발자들이 다양한 code를 등록할 것이므로, test에는 실제 IoT app을 참조하는 15개의 base app code ( Ci ) 를 구현했음
    • 이 기본 code들로부터, 수천개의 code를 Pluto에 등록함
    • 각 code들을 n번 복사하여 Ci,x code를 만듬
      • Ci,xCi,yxy인 경우 서로 다른 것으로 취급
      • code manager : 각 Ci,x에 대해 unique code identifier 생성
      • worker : 별도의 Java Classloader 작성 ( 각 등록된 code를 다른 memory에 load )
  • 많은 수의 IoT stream query를 만들기 위해 아래 과정을 반복
    • 등록된 code들 중 하나 ( C ) 를 선택
    • 9가지 유형의 data 중 하나로부터 새로운 data stream ( stream ) 생성
    • C를 참조하는 stream을 subscribe하는 stream query 생성
      • 기본적으로 query들은 C를 균등하게 선택

2. Metrics

  • throughput와 latency를 측정
    • throughput : Pluto가 1초에 처리한 event 개수
    • latency : Pluto가 곧 다가오는 event를 처리하는데 얼마나 반응적인가?
  • 최대 throught 측정 방법 : 아래와 같은 방법을 사용
    • 각 step마다 X개의 stream query 제출
    • 제출된 query에 대한 data stream 생성
    • throughtput / latency 측정
  • 평가에 사용된 많은 stream query는 알림 및 이상 감지를 위해 1초 미만의 latency가 필요
    • latency 중앙값을 100ms, P99 latency를 1s로 바꾸는 point를 찾을 때까지 반복했음

3. Applications

  • 15개의 base IoT app을 만들어서 테스트했음 ( parameter )

    1. A/C control ( 온도 ) : 가정 내 원격 IoT 센서에서 생성된 window 내 온도 데이터를 분석해 에어컨을 제어. user가 목표 온도를 설정 가능
    2. Abnormal heart rate ( 심박수 ) : window 내의 심박 센서에서 생성된 환자의 심박 데이터 스트림에서 비정상적인 심박수를 감지. window 안에서 측정된 심박수의 10% 이상이 threshold를 초과하면 의사에게 통보
    3. Remote child care ( user GPS ) : 자녀가 일정 거리 이상 안전한 장소 목록에서 멀어지면 부모에게 알림
    4. Elderly care ( Motion ) : 일정 시간 내에 움직임 데이터 스트림을 분석하여 노인들의 비활동적인 움직임을 감지, 요양보호사에게 알림
    5. Lightbulb control ( Taxi GPS ) : user가 집에서 일정 거리 떨어져 있을 때 집에서 전구를 끄거나, 집 안이나 근처에 있을 때 전구를 켬
    6. Child mood monitoring ( 심박수 ) : 심장 박동수로부터 아기의 기분을 지속적으로 모니터링하고 아이가 기분이 좋지 않은 경우 부모에게 알림. 평균 심박수가 window에서 지속적으로 높으면 (심박수가 높은 범위에 포함) 아기가 기분이 좋지 않다고 판단
    7. Medication management ( 온도 ) : window에서 투약 보관 온도가 허용 범위 내에 있는지 여부를 탐지
    8. Fire alarm ( 온도 ) : 지속적으로 온도를 모니터링하고 window에 작은 편차로 온도가 빠르고 일정하게 증가하면 가정에서 화재를 감지
    9. Soil moisture monitoring ( 토양 수분 ) : window에서 토양 수분을 모니터링하고 수분이 정상 범위에 있지 않을 때 농부에게 알림
    10. Moisture removal ( 습도 ) : window에 있는 창고의 습도 데이터 스트림을 모니터링하고 습도가 습도 제한보다 높으면 자동으로 탈수기를 켬
    11. Green house ( 온도 ) : 현재 온도가 window에서 이전 온도와 어긋나면 자동으로 온실 내부의 온도 이상을 감지하여 이를 온실 소유자에게 알림
    12. Find gas station ( Taxi GPS ) : 사용자가 설정한 가격대를 제공하는 주유소의 가까운 곳을 자동으로 알림
    13. Find parking lots ( Taxi GPS ) : 자동차가 느려지거나 멈출 때 근처의 주차 장소를 자동으로 알림
    14. Electricity warning ( 전력 소비 ) : 가정용 전기 사용량 트래픽을 window에서 모니터링하고 window 내부의 총 사용량이 지정된 한도보다 클 경우 사용자에게 경고
    15. Animal care ( 가축 GPS ) : 양치기 개가 가축과 일정 거리 떨어져있을 때 양치기 주인에게 알림

B. Performance Comparison

  • Pluto가 다른 기술들보다 훨씬 좋은 성능을 보임 ( IoT 특성 인식 최적화 덕분 )
    • Storm : 각 stream query마다 JVM process를 생성하여 memory bottleneck 발생
    • Flink : query 제출 부분에서의 bottleneck이 있음 ( code 제출 / query 제출이 묶여있음 )
    • PipelineDB : 서로 다른 data stream을 검색하기 위해 query 실행 시 많은 수의 network 연결, thread가 생성되어짐
    • TPQ ( threads per query ) : code와 message broker의 공통점에 무지하여 수천개의 network 연결을 만들 때의 bottleneck 발생
    • TP ( thread pool ) : TPQ와 동일 ( 만들어지는 연결, code가 너무 많아 bottleneck 발생 )

C. Performance Breakdown

  • Pluto의 성능을 분석 ( resource 공유의 성능적 이득을 평가 ) // 처리한 query 개수로 판단
    • Query submission : code 등록 부분과 query 제출 부분을 나눔으로써 query 제출에서의 bottleneck 줄임
    • Network connection sharing : network 연결을 공유함으로써 많은 code를 생성할 때의 bottleneck을 줄일 수 있음 -> 약 2.4배의 성능 이득
    • Code sharing : code를 공유하므로써 훨씬 더 좋은 성능을 보일 수 있었음
      • TP + NC가 TPQ + NC보다 성능이 좋았음 ( thread들의 overhead를 줄이는 기법 )
    • Locality-aware sceduling : code의 지역적 특성을 이용한 스케줄링 방법으로 CPU cache miss를 최소화하여 성능 이득
      • query의 개수가 많아질수록 더 가파르게 차이가 보임

D. Load Rebalancing: Q-group Split and Merging

  • Pluto : 다중 IoT stream query가 편향된 app에서 다중 core에 대한 성능 간섭이 거의 없음
    • 성능 평가 : 몇몇개의 app에서 많은 수의 query가 제출되는 상황을 제작
    • 결과 : rebalance가 있을 때 / 없을 때의 성능평가가 이루어짐
      • rebalance가 없을 때의 latency : 95초 ( P99 )
      • rebalance가 있을 때의 latency : 961ms ( P99 )
    • 여러 thread들 사이에서 load를 잘 분산시키면서 latency를 유지할 수 있다

E. Tradeoff

  • data stream의 크기에 따른 성능 평가를 진행 ( Pluto vs. Flink )
    • 입력 data rate : 각 query에 대한 data stream의 크기
    • 결과 ( 제출된 query의 수 = 최대 throughput/query의 입력 속도 )
      • Pluto : 작은 data stream을 처리하는데 최적화되어있어 더 좋은 성능을 보임
        • IoT-aware 최적화 활용
      • Flink : 큰 data stream을 처리하는데 최적화되어있어 더 좋은 성능을 보임
        • data stream을 병렬로 처리함

Discussion

  • Pluto : code 등록 / IoT query 구축을 위한 새로운 API를 제공했음 -> code 및 query 제출을 위한 추가 단계가 필요
    • 개발자 : parameter를 선언하고, API를 이용하여 UDF에 bind하고 code를 등록해야함
      • main app logic을 수정할 필요는 없음
      • Pluto가 app code와 같은 언어로 만들어졌다면 code snippet을 다시 사용할 수 있을 것
    • query 작성 API가 dataflow API를 사용함
      • 기존 app을 변환하고 다양한 프로그래밍 언어를 지원하도록 하는 컴파일러를 구축한다면 개발자가 이후에 app code를 이식하고 재작성하는 수고를 덜 수 있을 것
  • 이 부분은 추가할 것

Conclusion

  • Pluto : IoT-aware 최적화를 포함한 새로운 IoT stream processing system
    • 효율적으로 resource와 map query를 공유하기 위해, query 제출과 code 등록 부분을 분할했음
    • IoT-aware 실행 model을 가져 code reference의 locality를 활용
    • 새로운 stream query 스케줄링 단위인 Q-group을 이용하여 다중 core에서의 load를 효과적으로 재분배
    • 다양한 app에서의 실험으로 다른 기술들과 비교하여 적은 latency로 엄청난 양의 throughput 향상을 가져올 수 있었음
This post is licensed under CC BY 4.0 by the author.