Bài toán tối ưu performance và memory cùng với Akka
Đây là bài toán thực tế trong project của mình Có lẽ cũng nhiều bạn cũng đã gặp trường hợp tương tự Có khi chúng ta thường tặc lưỡi bỏ qua, hay đổ tội cho “con server”,.. but NOT TODAY Problem (đã được lược giản cho phù hợp) 1 con batch chạy hàng ngày để crawl dữ ...
Đây là bài toán thực tế trong project của mình
Có lẽ cũng nhiều bạn cũng đã gặp trường hợp tương tự
Có khi chúng ta thường tặc lưỡi bỏ qua, hay đổ tội cho “con server”,.. but NOT TODAY
Problem
(đã được lược giản cho phù hợp)
1 con batch chạy hàng ngày để crawl dữ liệu rồi insert vào Database
Đây là data lược giản khi crawl
Code hiện tại của batch
accounts foreach(account => { // something cool ... campaigns.foreach(campaign => { // something cool ... println("Insert campaign") keywords.foreach(keyword => { // something cool ... println("Insert keyword") // }) }) })
Điểm qua các vấn để của code hiện tại:
- Blocking code
- Chỉ sử dụng 1 Thread
- Không có cơ chế khi Failure
=> Performance thấp, không tận dụng được tài nguyên của server, độ ổn định không cao
Và quan trọng nhất là số lượng accounts sắp tới sẽ tăng lên gấp nhiều lần…
Solution
Akka — Chiếc Ferrari 812 động cơ V12
Nhưng tại sao….?
- Akka dựa trên Actor model cung cấp cơ chế xử lý trên Multi-thread với bộ sậu Actor, Dispatcher, Routing, Mailboxes,… mà không cần quan tâm đến các vấn đề thường gặp khi xử lý Multi-thread
- A clustered, high-availability architecture
Đây là cách Akka hoạt động:
First design
Let The Code Speak For Itself
package com.lightbend.akka.sample | |
import akka.actor.{Actor, ActorLogging, ActorSystem, Props} | |
import akka.routing.RoundRobinPool | |
object SampleData { | |
case class Account(id: Int, name: String) | |
case class Campaign(id: Int, accountId: Int, name: String) | |
case class Keyword(id: Int, campaignId: Int, name: String) | |
val accounts: List[Account] = List( | |
Account(1, "Maria"), | |
Account(2, "Jon"), | |
Account(3, "Snow") | |
) | |
val campaigns: List[Campaign] = List( | |
Campaign(1, 1, "Maria-campaign-1"), | |
Campaign(2, 1, "Maria-campaign-2"), | |
Campaign(3, 2, "Jon-campaign-1"), | |
Campaign(4, 3, "Snow-campaign-1"), | |
) | |
val keywords: List[Keyword] = List( | |
Keyword(1, 1, "A"), | |
Keyword(2, 1, "B"), | |
Keyword(3, 1, "C"), | |
Keyword(4, 1, "D"), | |
Keyword(5, 1, "E"), | |
Keyword(6, 2, "F"), | |
Keyword(7, 2, "G"), | |
Keyword(8, 2, "H"), | |
Keyword(9, 3, "I"), | |
) | |
} | |
object Test extends App { | |
import com.lightbend.akka.sample.Test.AccountActor._ | |
import SampleData._ | |
object AccountActor { | |
def props = Props[AccountActor] | |
case class ProcessAccount(account: Account) | |
} | |
class AccountActor extends Actor with ActorLogging { | |
import AccountActor._ | |
import CampaignActor._ | |
val campaignActor = context.actorOf(CampaignActor.props, "adGroup") | |
override def receive: Receive = { | |
case ProcessAccount(account) => | |
val campaignIn = for (campaign <- campaigns; if campaign.accountId == account.id) yield campaign | |
campaignIn foreach (campaign => campaignActor ! ProcessCampaign(campaign)) | |
} | |
} | |
object CampaignActor { | |
def props: Props = Props[CampaignActor] | |
case class ProcessCampaign(campaign: Campaign) | |
} | |
class CampaignActor extends Actor with ActorLogging { | |
import CampaignActor._ | |
import KeywordActor._ | |
val keywordActor = context.actorOf(RoundRobinPool(5).props(KeywordActor.props), "keyword") | |
override def receive: Receive = { | |
case ProcessCampaign(campaign) => | |
log.info(s"Insert campaign ${campaign.name}") | |
val keywordsIn = for (keyword <- keywords; if keyword.campaignId == campaign.id) yield keyword | |
keywordsIn foreach (keyword => keywordActor ! ProcessKeyword(keyword)) | |
} | |
} | |
object KeywordActor { | |
def props = Props[KeywordActor] | |
case class ProcessKeyword(keyword: Keyword) | |
} | |
class KeywordActor extends Actor with ActorLogging { | |
import KeywordActor._ | |
override def receive: Receive = { | |
case ProcessKeyword(keyword) => | |
log.info(s"Insert keyword ${keyword.name}") | |
} | |
} | |
val system: ActorSystem = ActorSystem("BatchAkka") | |
val accountActor = system.actorOf(AccountActor.props, "account") | |
accounts.foreach(accountActor ! ProcessAccount(_)) | |
} |
Kết quả
- Tất cả đều là non-blocking code, xử lý liên tục mà không cần đoạn trên hoàn thành
- Tận dụng tối đa tài nguyên server, Dispatcher sẽ phân các actor vào từng thread, càng thêm CPU càng nhanh
- Dễ dàng điều chỉnh số lượng actor
- Code decoupling, mỗi Actor gói gọn 1 business
Nhược điểm
Tuy nhiên, với design này lộ ra 1 nhược điểm: out of memory
Tưởng tượng cứ như việc tắc đường ở hiện tượng thắt cổ chai:
Lý do bởi vì Actor xử lý message thông của Mailbox của chính nó một cách tuần tự.
Account process xử lý rất nhanh do không có IO process, nó đổ messages liên tục vào Mailbox của Campaign Actor, tượng tự với Campaign actor đổ messages vào mailbox của Keyword actor.
Không cần quan tâm tình trạng của process sau mình.
Với việc Mailbox tăng nhanh, mà actor ở process đấy có hạn(đang busy xử lý) => memory dùng cho Mailboxs tăng cao một cách không cần thiết.
Cũng như con đường 5 làn oto đổ vào con đường chỉ chịu được 2 làn oto! Cuối con đường thì chỉ có từng ấy oto ra được mà thôi!
Vậy thì sao nhỉ?
Có 2 cách xử lý trong trường hợp này
- Work pulling pattern
- Akka streams
Trong phần tiếp theo mình sẽ phân tích 2 cách làm trên và cách project mình implement như nào!
Link bài viết trên blog của mình: Blog