成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

源碼分析Kafka之Producer

BDEEFE / 2502人閱讀

摘要:核心實(shí)現(xiàn)是這個(gè)方法通過(guò)不同的模式可以實(shí)現(xiàn)發(fā)送即忘忽略返回結(jié)果同步發(fā)送獲取返回的對(duì)象,回調(diào)函數(shù)置為異步發(fā)送設(shè)置回調(diào)函數(shù)三種消息模式。

Kafka是一款很棒的消息系統(tǒng),可以看看我之前寫(xiě)的 后端好書(shū)閱讀與推薦來(lái)了解一下它的整體設(shè)計(jì)。今天我們就來(lái)深入了解一下它的實(shí)現(xiàn)細(xì)節(jié)(我fork了一份代碼),首先關(guān)注Producer這一方。

要使用kafka首先要實(shí)例化一個(gè)KafkaProducer,需要有brokerIP、序列化器必要Properties以及acks(0、1、n)、compression、retries、batch.size非必要Properties,通過(guò)這個(gè)簡(jiǎn)單的接口可以控制Producer大部分行為,實(shí)例化后就可以調(diào)用send方法發(fā)送消息了。

核心實(shí)現(xiàn)是這個(gè)方法:

public Future send(ProducerRecord record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord interceptedRecord = this.interceptors.onSend(record);//①
    return doSend(interceptedRecord, callback);//②
}

通過(guò)不同的模式可以實(shí)現(xiàn)發(fā)送即忘(忽略返回結(jié)果)、同步發(fā)送(獲取返回的future對(duì)象,回調(diào)函數(shù)置為null)、異步發(fā)送(設(shè)置回調(diào)函數(shù))三種消息模式。

我們來(lái)看看消息類(lèi)ProducerRecord有哪些屬性:

private final String topic;//主題
private final Integer partition;//分區(qū)
private final Headers headers;//頭
private final K key;//鍵
private final V value;//值
private final Long timestamp;//時(shí)間戳

它有多個(gè)構(gòu)造函數(shù),可以適應(yīng)不同的消息類(lèi)型:比如有無(wú)分區(qū)、有無(wú)key等。

①中ProducerInterceptors(有0 ~ 無(wú)窮多個(gè),形成一個(gè)攔截鏈)對(duì)ProducerRecord進(jìn)行攔截處理(比如打上時(shí)間戳,進(jìn)行審計(jì)與統(tǒng)計(jì)等操作)

public ProducerRecord onSend(ProducerRecord record) {
    ProducerRecord interceptRecord = record;
    for (ProducerInterceptor interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // 不拋出異常,繼續(xù)執(zhí)行下一個(gè)攔截器
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}

如果用戶(hù)有定義就進(jìn)行處理并返回處理后的ProducerRecord,否則直接返回本身。
然后②中doSend真正發(fā)送消息,并且是異步的(源碼太長(zhǎng)只保留關(guān)鍵):

private Future doSend(ProducerRecord record, Callback callback) {
    TopicPartition tp = null;
    try {
        // 序列化 key 和 value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
        }
        // 計(jì)算分區(qū)獲得主題與分區(qū)
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);
        // 回調(diào)與事務(wù)處理省略。
        Header[] headers = record.headers().toArray();
        // 消息追加到RecordAccumulator中
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs);
        // 該批次滿(mǎn)了或者創(chuàng)建了新的批次就要喚醒IO線(xiàn)程發(fā)送該批次了,也就是sender的wakeup方法
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    } catch (Exception e) {
        // 攔截異常并拋出
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

下面是計(jì)算分區(qū)的方法:

private int partition(ProducerRecord record, 
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 消息有分區(qū)就直接使用,否則就使用分區(qū)器計(jì)算
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey,
                     record.value(), serializedValue, cluster);
}

默認(rèn)的分區(qū)器DefaultPartitioner實(shí)現(xiàn)方式是如果partition存在就直接使用,否則根據(jù)key計(jì)算partition,如果key也不存在就使用round robin算法分配partition。

/**
 * The default partitioning strategy:
 * 
    *
  • If a partition is specified in the record, use it *
  • If no partition is specified but a key is present choose a partition based on a hash of the key *
  • If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) {//key為空 int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分區(qū) if (availablePartitions.size() > 0) {//有分區(qū),取模就行 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {// 無(wú)分區(qū), return Utils.toPositive(nextValue) % numPartitions; } } else {// key 不為空,計(jì)算key的hash并取模獲得分區(qū) return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin } }

以上就是發(fā)送消息的邏輯處理,接下來(lái)我們?cè)倏纯聪l(fā)送的物理處理。

Sender(是一個(gè)Runnable,被包含在一個(gè)IO線(xiàn)程ioThread中,該線(xiàn)程不斷從RecordAccumulator隊(duì)列中的讀取消息并通過(guò)Selector將數(shù)據(jù)發(fā)送給Broker)的wakeup方法,實(shí)際上是KafkaClient接口的wakeup方法,由NetworkClient類(lèi)實(shí)現(xiàn),采用了NIO,也就是java.nio.channels.Selector.wakeup()方法實(shí)現(xiàn)。

Senderrun中主要邏輯是不停執(zhí)行準(zhǔn)備消息和等待消息:

long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④

③完成消息設(shè)置并保存到信道中,然后監(jiān)聽(tīng)感興趣的key,由KafkaChannel實(shí)現(xiàn)。

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

// transportLayer的一種實(shí)現(xiàn)中的相關(guān)方法
public void addInterestOps(int ops) {
    key.interestOps(key.interestOps() | ops);
}

④主要是Selectorpoll,其select被wakeup喚醒:

public void poll(long timeout) throws IOException {
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int numReadyKeys = select(timeout);//wakeup使其停止阻塞
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set readyKeys = this.nioSelector.selectedKeys();

        // Poll from channels that have buffered data (but nothing more from the underlying socket)
        if (dataInBuffers) {
            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
            Set toPoll = keysWithBufferedRead;
            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
            pollSelectionKeys(toPoll, false, endSelect);
        }

        // Poll from channels where the underlying socket has more data
        pollSelectionKeys(readyKeys, false, endSelect);
        // Clear all selected keys so that they are included in the ready count for the next select
        readyKeys.clear();

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        immediatelyConnectedKeys.clear();
    } else {
        madeReadProgressLastPoll = true; //no work is also "progress"
    }

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}

其中pollSelectionKeys方法會(huì)調(diào)用如下方法完成消息發(fā)送:

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}

private boolean send(Send send) throws IOException {
    send.writeTo(transportLayer);
    if (send.completed())
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    return send.completed();
}

Send是一次數(shù)據(jù)發(fā)包,一般由ByteBufferSend或者MultiRecordsSend實(shí)現(xiàn),其writeTo調(diào)用transportLayerwrite方法,一般由PlaintextTransportLayer或者SslTransportLayer實(shí)現(xiàn),區(qū)分是否使用ssl

public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn"t happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;
}

public int write(ByteBuffer src) throws IOException {
    return socketChannel.write(src);
}

到此就把Producer業(yè)務(wù)相關(guān)邏輯處理非業(yè)務(wù)相關(guān)的網(wǎng)絡(luò) 2方面的主要流程梳理清楚了。其他額外的功能是通過(guò)一些配置保證的。

比如順序保證就是max.in.flight.requests.per.connectionInFlightRequestsdoSend會(huì)進(jìn)行判斷(由NetworkClientcanSendRequest調(diào)用),只要該參數(shù)設(shè)為1即可保證當(dāng)前包未確認(rèn)就不能發(fā)送下一個(gè)包從而實(shí)現(xiàn)有序性

public boolean canSendMore(String node) {
    Deque queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

再比如可靠性,通過(guò)設(shè)置acks,SendersendProduceRequestclientRequest加入了回調(diào)函數(shù):

    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());//調(diào)用completeBatch
        }
    };
    
     /**
     * 完成或者重試投遞,這里如果acks不對(duì)就會(huì)重試
     *
     * @param batch The record batch
     * @param response The produce response
     * @param correlationId The correlation id for the request
     * @param now The current POSIX timestamp in milliseconds
     */
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now, long throttleUntilTimeMs) {
    }
    
    public class ProduceResponse extends AbstractResponse {
      /**
         * Possible error code:
         * INVALID_REQUIRED_ACKS (21)
         */
    }
    

kafka源碼一層一層包裝很多,錯(cuò)綜復(fù)雜,如有錯(cuò)誤請(qǐng)大家不吝賜教。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/76861.html

相關(guān)文章

  • Kafka學(xué)習(xí)筆記掃盲

    摘要:相關(guān)概念協(xié)議高級(jí)消息隊(duì)列協(xié)議是一個(gè)標(biāo)準(zhǔn)開(kāi)放的應(yīng)用層的消息中間件協(xié)議??梢杂妹钆c不同,不是線(xiàn)程安全的。手動(dòng)提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫(xiě)成單例模式,有助于減少端占用的資源。自身是線(xiàn)程安全的類(lèi),只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問(wèn)題: 我想分析一下用戶(hù)行為(pageviews),以便我能設(shè)計(jì)出更好的廣告位 我想對(duì)用戶(hù)的搜索關(guān)鍵詞進(jìn)行統(tǒng)計(jì),...

    GT 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<