成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Kafka源碼閱讀日記——ProducerInterceptor

Genng / 1601人閱讀

摘要:發(fā)送消息在上述示例中我們使用了接口傳入并發(fā)送,在實(shí)際實(shí)現(xiàn)中該方法使用另一個(gè)接口并傳入了回調(diào)函數(shù)。需要注意的是,如果攔截器拋出異常,程序不會(huì)停止,只會(huì)寫入一個(gè)級(jí)別的日志。如果下一個(gè)攔截器依賴于上一個(gè)的結(jié)果,那么最終得到的數(shù)據(jù)可能不正確。

Kafka作為當(dāng)前流行的消息中間件,在消息隊(duì)列、微服務(wù)架構(gòu)、大數(shù)據(jù)平臺(tái)等方面有著廣泛的應(yīng)用。如果將平臺(tái)比作人體,Kafka便是神經(jīng)系統(tǒng),負(fù)責(zé)傳遞消息。本系列利用碎片化的時(shí)間,閱讀Kafka源碼深入了解各個(gè)模塊的原理和實(shí)現(xiàn),不定期更新。文中所有代碼均來自https://github.com/apache/kafka

Kafka Producer簡單使用示例

KafkaProducer用于將事件從客戶端應(yīng)用發(fā)送至Kafka集群。Producer本身是線程安全的,并且多個(gè)線程共享單個(gè)實(shí)例時(shí)也會(huì)有性能上的提升。以下示例來自org.apache.kafka.clients.producer.KafkaProducer類:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

props變量定義了Producer的屬性和基本配置信息:

bootstrap.servers:kafka服務(wù)器地址,事件會(huì)發(fā)送至該服務(wù)器。在生產(chǎn)環(huán)境中通常定義多個(gè)服務(wù)器并用逗號(hào)分隔,以防止單個(gè)服務(wù)器突然崩潰。

acks:當(dāng)事件發(fā)送至Kafka集群時(shí),數(shù)據(jù)在集群內(nèi)部會(huì)有主從備份,acks定義了何時(shí)可以判定消息發(fā)送成功。

acks = 0時(shí),Producer在消息發(fā)送后不會(huì)等待服務(wù)器返回結(jié)果,立刻返回成功。

acks = 1時(shí),消息在主(leader)服務(wù)器寫入后返回成功,不會(huì)等待從(follower)服務(wù)器備份完成。

acks = all時(shí),消息在主從服務(wù)器都寫入成功后才告知Producer發(fā)送成功。

retries: 當(dāng)發(fā)送失敗時(shí),producer自動(dòng)重發(fā)的次數(shù),并不是所有的錯(cuò)誤都可以觸發(fā)自動(dòng)重發(fā),并且自動(dòng)重發(fā)可能導(dǎo)致消息發(fā)送順序錯(cuò)亂,具體信息將在以后的章節(jié)介紹

key.serializer/value.serializer: 所有發(fā)送至kafka的數(shù)據(jù)都是以byte形式存在的,key/value serializer負(fù)責(zé)將Java實(shí)例轉(zhuǎn)化為字節(jié)。

使用上述配置初始化proudcer后,我們可以構(gòu)建ProducerRecord,這里使用topic,key,value構(gòu)建消息并調(diào)用producer.send方法發(fā)送至kafka集群。在程序結(jié)束前務(wù)必調(diào)用producer.close方法,因?yàn)槟J(rèn)情況下producer會(huì)在內(nèi)存中batch多個(gè)事件,并一起發(fā)送以增加性能,close方法會(huì)強(qiáng)制發(fā)送當(dāng)前內(nèi)存中未發(fā)送的事件。

發(fā)送消息

在上述示例中我們使用了send接口傳入并發(fā)送ProducerRecord,在實(shí)際實(shí)現(xiàn)中該方法使用另一個(gè)send接口并傳入了null回調(diào)函數(shù)。Kafka發(fā)送消息是異步的,回調(diào)函數(shù)可以獲得發(fā)送結(jié)果,若發(fā)送成功,回調(diào)函數(shù)可以得到消息的元數(shù)據(jù)包括topic,partition,offset等。若失敗可獲得錯(cuò)誤信息。

/**
* Asynchronously send a record to a topic. Equivalent to send(record, null).
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future send(ProducerRecord record) {
    return send(record, null);
}

/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
* 

* The send is asynchronous and this method will return immediately once the record has been stored in the buffer of * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. */ @Override public Future send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }

一個(gè)回調(diào)函數(shù)的例子:

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null) {
                              e.printStackTrace();
                           } else {
                              System.out.println("The offset of the record we just sent is: " + metadata.offset());
                           }
                       }
                   });
攔截器(ProducerInterceptor)
public interface ProducerInterceptor extends Configurable {
  /**
  * 消息發(fā)送前調(diào)用
  */
  public ProducerRecord onSend(ProducerRecord record);
  /**
  * 消息發(fā)送后,服務(wù)器返回結(jié)果(成功或錯(cuò)誤)時(shí)調(diào)用
  */
  public void onAcknowledgement(RecordMetadata metadata, Exception exception);

  /**
  * 攔截器關(guān)閉時(shí)調(diào)用
  */
  public void close();
}

每一個(gè)Producer都可以設(shè)置一個(gè)或多個(gè)攔截器,攔截器允許客戶端攔截或修改要發(fā)送的消息,通過Properties進(jìn)行設(shè)置:

Properties props = new Properties();
...
props.put("interceptor.classes", "your.interceptor.class.name");
public class KafkaProducer implements Producer {
  // ... other class members
  private final ProducerInterceptors interceptors;

  // Producer構(gòu)造函數(shù)
  KafkaProducer(ProducerConfig config,
                  Serializer keySerializer,
                  Serializer valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient) {
      // ...其他步驟省略
      // 從config中獲取攔截器實(shí)例,config從properties中構(gòu)造
      List> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
      this.interceptors = new ProducerInterceptors<>(interceptorList);   
  }           
}

攔截器設(shè)置完成后,在send方法中進(jìn)行調(diào)用:

@Override
public Future send(ProducerRecord record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}
ProducerInterceptors

ProducerInterceptors是一個(gè)容器類,封裝了多個(gè)攔截器,onSend方法被producer的send方法調(diào)用。

/**
* A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor}
* and wraps calls to the chain of custom interceptors.
*/
public class ProducerInterceptors implements Closeable {
    private final List> interceptors;

    public ProducerInterceptors(List> interceptors) {
        this.interceptors = interceptors;
    }
    
    public ProducerRecord onSend(ProducerRecord record) {
        ProducerRecord interceptRecord = record;
        // 按順序執(zhí)行每一個(gè)攔截器的onSend方法
        for (ProducerInterceptor interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
    
    /**
    * 1. 當(dāng)發(fā)送的消息被服務(wù)器接受并返回時(shí)調(diào)用
    * 2. 當(dāng)發(fā)送的消息未到達(dá)服務(wù)器之前就失敗時(shí)調(diào)用(見下方onSendError方法)
    **/
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
    
    /**
    * Producer在發(fā)送數(shù)據(jù)前要構(gòu)建多種不同的信息,每一步都有可能拋出異常,本方法由producer在遇到異常時(shí)調(diào)用,
    * TopicPartition記錄了topic和partition信息,由producer構(gòu)建,但若異常發(fā)生在其構(gòu)建之前,該參數(shù)為空,因此從record里提取topic和partition數(shù)據(jù)構(gòu)建。
    **/
    public void onSendError(ProducerRecord record, TopicPartition interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = new TopicPartition(record.topic(),
                                record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                    }
                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
}

需要注意的是,如果攔截器拋出異常,程序不會(huì)停止,只會(huì)寫入一個(gè)warn級(jí)別的日志。并且攔截器鏈也不會(huì)停止執(zhí)行,而是繼續(xù)執(zhí)行下一個(gè)攔截器。如果下一個(gè)攔截器依賴于上一個(gè)的結(jié)果,那么最終得到的數(shù)據(jù)可能不正確。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68744.html

相關(guān)文章

  • 源碼分析Kafka之Producer

    摘要:核心實(shí)現(xiàn)是這個(gè)方法通過不同的模式可以實(shí)現(xiàn)發(fā)送即忘忽略返回結(jié)果同步發(fā)送獲取返回的對(duì)象,回調(diào)函數(shù)置為異步發(fā)送設(shè)置回調(diào)函數(shù)三種消息模式。 Kafka是一款很棒的消息系統(tǒng),可以看看我之前寫的 后端好書閱讀與推薦來了解一下它的整體設(shè)計(jì)。今天我們就來深入了解一下它的實(shí)現(xiàn)細(xì)節(jié)(我fork了一份代碼),首先關(guān)注Producer這一方。 要使用kafka首先要實(shí)例化一個(gè)KafkaProducer,需要有...

    BDEEFE 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<