《千億級eBay平臺的Kafka深度實踐》要點:
本文介紹了千億級eBay平臺的Kafka深度實踐,希望對您有用。如果有疑問,可以聯系我們。
方偉
eBay中國研發中心 資深軟件工程師2010年加入eBay,一直在系統平臺部負責設計和開發工作.最初負責整個eBay的數據庫應用層的開發和優化;接著從事用戶行為數據的收集,數據管道的建立以及部分數據分析工作,是開源項目PulsarIO的主要貢獻者之一;
目前致力于 eBay 的實時數據傳輸和計算平臺,基于 Kafka 和 Storm 等開源軟件.對建設高可用,高擴展性,并且可自動化運維的分布式系統有豐富經驗.
本文主題是我們最近一年做的,基于 Kafka 做的企業級數據傳輸平臺,我們實現這個平臺,以及這個平臺最終上線并在對它運維過程中,得到的心得體會和經驗教訓.
這是本文主要分四部分:
首先是平臺的概述,什么是數據傳輸平臺,我們為什么要搭建這個平臺.
其實在互聯網企業當中有很多系統,一般來講我們可以把互聯網當中各個系統分成兩類:
一類是在線系統,在線系統是直接跟用戶打交道的站點系統,比如對于電子商務網站來說,我們有商品瀏覽,商品搜索.我們有賣家發布商品,我們有商品發布系統.
另一類是離線系統,離線系統最主要的比如我們做BI分析,站點報表,包括財務方面的報表,以及用戶行為的分析,離線系統里我們一般有一些產品,比如大數據,最近我們會用 hadoop 做一些數據挖掘.
其實離線系統這些數據最終來源也是從在線系統來的,那我們怎么樣從龐大在線系統中,把數據傳輸到離線系統,這就是我們需要解決的問題.
對于 eBay 來講,它的在線系統肯定有好幾萬臺,比如我們要采集用戶行為,系統里有好幾個 PB 的關系型數據庫存儲,我們要從數據庫當中,把數據庫變化傳輸到后臺離線系統當中,怎么做呢?
近年來,對實時計算的需求越來越多,很多比如像欺詐檢測,像用戶個性推薦,這種類型的系統,都要求實時性,我不想等到一夜過后這個數據才過來,我希望馬上拿到數據,能夠進行計算,得到計算結果,然后反饋到其他在線系統.
在這種情況下,我們需要一個實時平臺,把數據從在線系統傳輸到離線系統,所以這就是我們為什么要搭建這個傳輸平臺.
我們為什么要用 Kafka ,這里不對 Kafka 做過多介紹,說一下我們看中了 Kafka 哪些點,我們為什么會最終選用 Kafka 做數據傳輸平臺.
其實我們數據傳輸平臺是很典型消息中間件,消息中間件市面上有很多產品,為什么要用 Kafka 呢?
除此之外它也有很好的伸縮性,你增加 Kafka 節點,處理是直線性增長的,同時也可以保證高可用性,像 Kafka 一些節點宕掉的話,不會影響到數據完整性.
既然選用了 Kafka ,那么 eBay 在 Kafka 上有多少數據呢?我們大概放了哪些數據在 Kafka 上呢?
目前我們大概有三十多個 Kafka 集群,這些 ?Kafka ?集群主要建立在 eBay 自己的私有云上面,我們基于 OpenStark 搭建的自己私有云,所以我們 Kafka 的節點都是虛擬機.
我們總共有800多臺虛擬機,在這些集群里我們總共有1200多個應用跑在上面,這些應用總數加起來超過2.5萬個,每天消息達到1000億次以上.所以這個可以看出這是很典型的大數據實時傳輸的例子.
那我們為什么要分這么多 Kafka 集群?
這個非常類似數據庫的分庫,我們基于業務垂直劃分 Kafka 集群,比如所有用戶行為,用戶點擊、搜索、對商品的瀏覽等等用戶行為,我們同樣會放到 Kafka 集群里,對于所有數據庫變化,比如當一個賣價要修改一個價格,這個數據變化我們會放在 Kafka 集群里.
比如還有一些站點,本身自己想發出一些業務事件,比如當一個商品成交了,這些業務事件我們會放在另外一個基于 Kafka 的業務集群了.
我們知道 LinkedIn 是最初提出 Kafka 的,他們應該有60多個 Kafka .
那我們有 Kafka 這樣的開源產品,我們是不是可以直接拿來用呢?不需要做任何事情呢?
當然不是,其實 Kafka 提供的是單元功能,作為企業應用來講,你要做企業實時傳輸平臺解決方案,需要基于 Kafka 做很多額外服務,每個企業總歸該是有一些自身需求,比如企業對安全考慮,每個企業實現方式不一樣,企業數據中心分布也是不一樣,對于不同企業自身的需求,我們需要做一些額外服務支持它.
eBay 做了哪些服務呢?舉一些很簡單的例子,比如我們想讓一個用戶在集群上創建他自己的 Kafka ?topic,你不是直接讓他到一個節點上,這顯然是不夠安全,同時也不夠方便.
這樣我們必然要有一個提供管理功能的服務器,我們希望提供一個統一的入口,以及統一的 topic 名稱空間,那么我們就需要引入原數據中心的服務.
比如我們在上海、北京都有數據中心.我們怎么把數據從上海遷到北京,這時候就需要有數據鏡像服務.
再比如我們剛剛講到,整個 Kafka 集群是在 openstack 云上面,當我們需要建立一個新集群的時候,或者一片集群需要修復,或者為這個集群需要新增節點,或者廢棄節點的時候.
我們需要怎樣調用 openstack 功能完成?同時我們還有很多監控功能的服務,系統日志服務.
我們都是把所有服務通過界面形式暴露出來,同時把它的下端用戶直接到界面上做一些事情,不用非要找到系統管理員才能做,他們可以自己直接做.
以上是我們這個系統擁有的一些服務,下面我會就這個系統里面的服務做稍微詳細一點的介紹.
首先是元數據服務,為什么要提出元數據服務,因為我們希望給大家邏輯上提供一個統一的 topic 名稱空間,比如我希望訪問到用戶行為數據,如果我們沒有這個服務的話,我必須要讓用戶知道你的用戶行為的 Kafka 集群在哪里,還要知道你連到哪一個.
然后 topic 名稱是什么?比如我們從服務里面直接查詢一個用戶行為,你直接找到這個 topic,服務后面會找到真正的 Kafka 集群在哪里,然后再返回給客戶端,讓它連上來.
除了提供統一名稱空間之外,我們還提出了叫做 topic 分裝的概念,為什么會有這種東西呢?
因為我們剛才說到自服務,我們希望用戶自己創建,如果說讓它無限制創建的話,對系統資源肯定是傷害,因為它不知道你有多少資源還在那邊,如果他創建太多了,會把 Kafka 集群宕掉.
這時候就需要有配額管理,我們這邊引入的單位就是 topic 組,我們創建 topic 的時候需要系統管理員做審批.
一旦審批通過了,我會給 topic 組分配一些配額,比如我在上面創建多少 topic,在上面發生的網絡帶寬是多少都可以配置.所以這個也是便于后面的運維管理.
剛才我說了這些,都是要由元數據服務提供,那元數據服務怎么工作的呢?
在 Kafka 集群里,你不可能用這些集群本身所帶的管理 topic 去管理元數據,所以我必須要有元數據存儲.
我們引入了邏輯層對三十多個集群看起來就像一個集群,在這種情況下,用戶是不是在使用 Kafka API 的時候有問題,因為 API 并不能知道你要用哪一個.
我們看這個 Kafka 代理完全實現了 Kafka 的協議,這個協議定義了很多操作,這些操作是基于 TCP 層的.
我們去這樣一個代理,可以完全模擬 Kafka 本身 group 的協議.對于客戶端來說,是可以用原本 Kafka 的 API 訪問,這個 API 連接代理就像連接到單獨的 Kafka 集群一樣,這其實不是一個真正的 Kafka 集群,而是后面帶了三個 Kafka 集群.
下面講一下我們鏡像服務,其實我們有多個數據中心, Kafka 數據來源本身也是來自數據中心.那么我們大家怎樣搭建 Kafka 集群呢?
這里我們有一個模式 Tier—Aggregation,比如上海和北京都有用戶行為.我們希望做數據分析的時候,要能夠同時分析到上海、北京的數據,我需要把兩個地區的數據 run 起來.
比如我們只有兩個數據中心,我們創建四個 Kafka 集群,其中有兩個 location 的數據.
我們同時也做到跨數據中心的數據冗余,比如北京數據中心燒掉了,我們上海數據中心依然可以把所有數據拿出來.
這個其實最是也是由 LinkedIn 提出的比較推薦的方式,雖然引入了很多數據冗余,但是它保證了它的運行.
因為 Kafka 本身有自己的 location,每個數據來了以后會引起三份網絡流量,這個網絡流量是為了讓 Kafka 集群高可用.
如果 Kafka 集群跨數據中心的話,所謂的網絡流量就會是跨數據中心,我們怎么把數據一個數據中心傳輸到另一個數據中心,這就是需要用到鏡像服務.
鏡像服務這方面其實我們會有很多管理,比如我多少開多少節點,多少線程,怎么啟動,怎么截止,所有管理工作我們都需要有具體服務做這樣的事情,也就是我們所說的鏡像服務,要實現具體的服務,但是它暴露出一個服務器,讓上層應用再去做數據鏡像的管理.
除此之外我們還有 Schema 注冊服務,對于普通平臺來講,所有經過平臺的數據都是可以進行管理的,我要求數據格式所有人都認識,所以我們定義了統一數據模式在平臺里.
Kafka 本身提供了 Schema 組件,背后用 Kafka 做存儲,而且高可用也做到了,我們是直接把它拿過來用,但是沒有百分百拿過來用.
因為它有一定的局限性,比如它不支持健全,所有人都可以來改,所有人都可以進行版本增加.
剛才所講的服務,不管是對用戶來講,還是管理員來講,我們都需要有一個界面操作它,因為不可能所有人都通過 SSH 去連服務器.
所以我們有一個用戶自服務 portal,從 consumer 注冊,producer 注冊,topicgroup 注冊,schema 的注冊.
剛才說到了要創建一個集群,對集群地面的一些節點進行替換,我們要新增新的節點等等,我們都需要調 openstack 的功能,但是這個地方我們需要一個很迷你的 PaaS 完成這個系統.
我們其實是基于 openstack 搭建了一個小的迷你 PaaS,除了提供功能工作流之外,還提供的運行工作流管理的功能.
openstack 提供了一套接口做這種事情,但是接口后面必須要選擇一個基于 ALQP 的協議,同樣對于配置也是一樣, Kafka ?默認配置是什么樣的,我們也有一些配置優化,改了一些配置讓它對所有節點優化,怎么管理這些配置也是在 Prism 服務器里做的.
那么說了這么多,平臺最終上線的時候,我們要對它進行運維,運維里最重要的是我們要把系統監控好,并且當它出現問題的時候我們要及時修復它.對于這個系統監控是非常重要的課題.
可以看出來,我們在這個系統中,其實是涉及到很多節點,對所有的我們打包起來,讓它完成一個業務語義.
在監控方面我們肯定要有統一視角看到一系列集群運行狀況,對于所有集群節點來說,并不是說宕一個節點就不行了,因為 Kafka 有數據冗余,宕一兩個節點是沒有問題的.
所以這里我就列出運營的節點是宕的,還是說健康的,我們運維人員對宕掉的節點進行修復.我們目前還是用人工的方式進行修復,因為我們需要分析這些也點宕掉的原因.
目前來講,系統運行時間并沒有超過一年,所以我們目前采用了人工的方式.以后我們會考慮當任何一個節點出現問題的時候,進行自動替換,自動替換的時候必然要引入一些規則,什么情況下可以自動停,什么情況下不能自動停.
對于 Kafka 來講,我們對每個節點,還有 Kafka 本身狀態的監控.對于 Kafka 系統運維人員來說,這個節點的系統資源也是需要監控很重要的內容,對于管理員來講系統狀態是很重要的.
還有一種情況不僅對管理員重要,對用戶也是很重要的,比如 Kafka 狀態監控,對于用戶來講,比如我想知道我昨天進入 Kafka 集群有多少數據,所以這個方面的監控,除了給運維人員,也提供給系統的用戶.
對于 consumer 也是一樣,對于 Kafka 來講很重要的監控是我要知道我的 consumer 到底有一個 leg.
如果這個 leg 一直增加的話,就說明 consumer 的應用肯定有問題的,我必須要對它進行一定處理,所以這個地方相當于我們幫助用戶把這些問題嚴控起來.
發生問題的時候,不僅 consumer 管理員要知道,它的用戶也要知道,所以報警系統也是通知到用戶.另外我們有些應用是有端到端要求的,我們必須知道數據從應用發出來,到進到 Kafka ,到底有多長時間.
那么在 Kafka 里面,在運維過程我們發現有一個很重要的課題,對于慢節點怎么處理,什么叫慢節點?
其實 Kafka 能夠很好的處理節點壞掉的情況,因為宕掉一兩個節點對于它來說不要緊,它可以很快把壞的節點拿掉,它可以從別的節點上迅速選過來.
但是這種慢節點并沒有死掉,只是比較慢的工作,比如正常情況下,它的吞吐率只有原來的1/10,那 Kafka 就不能把它拿掉,我們發現大部分系統都是存在這種問題.
為什么說一個節點出現性能問題,會影響整個 Kafka 集群呢?我們只有 Kafka 是做數據集群的.一旦這個節點性能出現問題,你到所有其他節點網絡連接的數據都會有問題.
相當于它會拖累很多其他節點,出去 Kafka 集群在這個時候把這個節點干掉了,如果沒有干掉,這個拖累會一直存在.
所以這就是我們為什么要在慢節點檢測,慢節點比死掉的節點處理更麻煩,因為這種也點還在.比如我們可以看你的 CPU 在多長時間達到多少以上,我們要建立一些規則.
同時我們也要對磁盤進行監控,IO 的性能是不是出現了大的問題.同時要對系統日志異常進行分析,最后還有一個更為直接的方式,我們創建一些 footprint topic,我定期對 topic 進行測試,首先看看通不通,再看看速率有沒有問題,這樣我就可以知道節點有沒有問題.
如果我檢測出來慢節點之后怎么處理?
最簡單的處理方式,就是把它停掉,其實把它停掉還是安全的,如果你覺得停掉對系統吞吐率造成影響的話,我們可以采取重啟的方式,但是對于很多慢節點來說,你把它重啟了它還是會慢.
如果一個節點,一個盤占用需非常高,這就說明資源分配很不均勻,這個我們引入 partition ?reassignrnert 來處理就不太好做,還是需要人工去處理.
剛才我們說了我們的 Kafka 集群是在每個數據中心有獨立拷貝的,比如北京數據中心整個 Kafka 宕掉了,那我能不能切到上海數據中心繼續 consumer,那怎么做到這一點,不用人為干預情況下,會自動的切過去?
這就需要用到 Kafka 的代理,如果代理知道數據中心出問題了,把 Kafka 返回的信息發回到另一個數據中心,這時候就可以進行很好的處理.
對于 consumer 需要做另外一個事情,因為對于同一個信息在不同數據中心是不一樣的.
比如上海數據中心比北京數據中心的 Kafka 早上一個月,那這個就肯定會有索引量偏移,所以我們就需要一個工具能夠找到這個偏移量多大.好在后續版本 Kafka 就提升了一條索引機制在 Kafka 服務器里,但是我們在實現這個功能的時候,那個版本還沒有出來.
先說一下為什么 Kafka 有這么大的吞吐率和性能.
首先它是保證了磁盤順序讀寫,因為我們知道磁盤順序讀寫是很好的,只要你不引入文件讀寫.我們知道它這個特性之后,怎么樣保證磁盤讀寫正常運行呢?我們不要讓它的磁頭經常跳,什么情況下磁頭經常跳呢?
受到一個 Kafka 里節點太多了,不同文件切換的時候,就會引起順序讀寫的方式.還有比如應用程序里,也會影響大數據讀寫的性能.如果要保證它是順序讀寫的話,盡量避免剛才說的操作.
還有一個是 Kafka 通過 Page Cache 保持很高的性能,對于 Kafka 端,如果你跟上的話,它永遠是從內部讀數據,不是從磁盤讀數據.
因為現在操作系統里我們會有 Page ?Cache 來監控數據處理,我們在運維當中,發現對于 Page ?Cache 來說,千萬不能有 Swap,一旦發生 Swap,節點的性能馬上會下降.
如果你是基于云的,特別是基于公有云的,這是很麻煩的事情,你需要在 Hypervisor 上進行設置.我們在使用當中也發現了 NUMA 不平衡的問題,這個會導致其他虛擬機所在的 CPU 內存沒有很好的運營起來,它在后續版本的 OpenStack 里,會讓你在分配 CPU 的時候,強行 PIN 的操作.
除此之外 Linux 也對 Page Cache 有一些優化的設置.有人可能會問我寫盤了異步的,如果數據來了之后,在數據真正寫到盤子之前,我的也點宕掉怎么辦?
Kafka 是除了在本節點之外,他會在其他節點上也有,它先進行保存,然后異步寫盤過程才會把這個消息真正落到盤里面去.
最后一點, Kafka 用到 linux 操作系統之后,Zero Copy 很簡單,當我的磁盤來的時候,可以不用直接進用戶內存,而是直接丟到網絡端口去,這里不是優化的,我們一定要小心 Zero Copy 會失效.
我們做 Kafka 升級的時候,有可能數據格式會變,比如從0.9變到1.0,如果說你直接強行切過來的話,就會破壞 Kafka .
那怎么解決這個問題,其實在1.0升級的說明里有一個很復雜很詳盡的說明,如果有人要做升級的話,一定要詳細看清楚這些說明.
同時另外一個導致 Zero Copy 失效就是可能引入 SSL/TLS,這個怎么處理呢?那你只能權衡一下安全性和性能.
另外還有一些其他參數也可以對 Kafka 進行優化,有一個是 File ?Descriptor 一定要很大.
包括在高吞吐率節點上,一定要增加 Max ?socket ?buffer ?size.權衡 unclean.leader.electionbenable,增加 fetch 線程數量 num.replica.fetchers,處理 leader 選舉 outo.leaderrebalance.enoble.
原文來自微信公眾號:高效運維