摘要:安裝模塊基本使用生產(chǎn)者簡單封裝初始化實例連接地址設(shè)置獲取當(dāng)前所有獲取當(dāng)前生產(chǎn)者對象發(fā)送數(shù)據(jù)需要傳入的可迭代對象連接切換設(shè)置新的獲取當(dāng)前設(shè)置的獲取所有要發(fā)送的可迭代對象引用來源博客園測試集群知乎使用生成器把寫入效率提高倍
1.1安裝模塊
pip install pykafka1.2基本使用
# -* coding:utf8 *- from pykafka import KafkaClient host = "IP:9092, IP:9092, IP:9092" client = KafkaClient(hosts = host) # 生產(chǎn)者 topicdocu = client.topics["my-topic"] producer = topicdocu.get_producer() for i in range(100): print i producer.produce("test message " + str(i ** 2)) producer.stop()1.3簡單封裝
class KafkaProduct(): def __init__(self,hosts,topic): """ 初始化實例 :param hosts: 連接地址 :param topic: """ self.__client = KafkaClient(hosts=hosts) self.__topic = self.__client.topics[topic.encode()] def __set_topic(self, topic): self.__topic = self.__client.topics[topic.encode()] def set_topic(self, topic): """ 設(shè)置topic :param topic: :return: """ self.__set_topic(topic) def get_topics(self): """ 獲取當(dāng)前所有topic :return: """ return self.__client.topics def get_topic(self): """ 獲取當(dāng)前topic :return: """ return self.__topic def Producer(self): """ 生產(chǎn)者對象 :return: """ with self.__topic.get_producer(delivery_reports=True) as producer: next_data = "" while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def send_data(self,datas): """ 發(fā)送數(shù)據(jù) :param datas:需要傳入的可迭代對象 :return: """ c = self.Producer() next(c) for i in datas: c.send(i) if __name__ == "__main__": hosts = "1.2.3.4:9999,2.3.4.5:9090" #連接hosts topic = "test_523" K = KafkaProduct(hosts=hosts, topic=topic) # #K.set_topic("test") #切換設(shè)置新的topic K.get_topic() #獲取當(dāng)前設(shè)置的topic #K.get_topics() #獲取所有topic data = range(10000) #要發(fā)送的可迭代對象 K.send_data(data)1.4引用來源
博客園:Python測試Kafka集群(pykafka)
知乎:使用生成器把Kafka寫入效率提高1000倍
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/43910.html
摘要:大量的和分區(qū)會嚴(yán)重影響集群性能。介紹可參考收到離線分區(qū)總數(shù)異常告警一般是某個節(jié)點(diǎn)宕機(jī)或者服務(wù)異常導(dǎo)致。若服務(wù)卡住,可在評估后在控制臺重啟該節(jié)點(diǎn)服務(wù)。若想了解當(dāng)前請求延時情況,建議關(guān)注平均請求延時監(jiān)控項。 FAQs本篇目錄一個UKafka集群可以創(chuàng)建多少個Topic?如何增加Topic的副本數(shù)量(ReplicationFactor)?收到離線分區(qū)總數(shù)>=10.0個告警,離線分區(qū)總數(shù)是什么,怎么...
摘要:相關(guān)概念協(xié)議高級消息隊列協(xié)議是一個標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件協(xié)議??梢杂妹钆c不同,不是線程安全的。手動提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設(shè)計出更好的廣告位 我想對用戶的搜索關(guān)鍵詞進(jìn)行統(tǒng)計,...
摘要:主題和分區(qū)的悄息通過主題進(jìn)行分類。在給定的分區(qū)里,每個悄息的偏移量都是唯一的。消費(fèi)者把每個分區(qū)最后讀取的悄息偏移量保存在或上,如果悄費(fèi)者關(guān)閉或重啟,它的讀取狀態(tài)不會丟失。主題可以配置自己的保留策略,可以將悄息保留到不再使用它們?yōu)橹埂0l(fā)布與訂閱消息系統(tǒng) 在正式討論Apache Kafka (以下簡稱Kafka)之前,先來了解發(fā)布與訂閱消息系統(tǒng)的概念, 并認(rèn)識這個系統(tǒng)的重要性。數(shù)據(jù)(消息)的發(fā)送...
閱讀 966·2019-08-30 14:24
閱讀 1005·2019-08-30 14:13
閱讀 1810·2019-08-29 17:21
閱讀 2707·2019-08-29 13:44
閱讀 1672·2019-08-29 11:04
閱讀 456·2019-08-26 10:44
閱讀 2579·2019-08-23 14:04
閱讀 918·2019-08-23 12:08