成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Http請(qǐng)求連接池-HttpClient的AbstractConnPool源碼分析

gself / 3176人閱讀

摘要:若使用連接池的方式,來管理連接對(duì)象,能極大地提高服務(wù)的吞吐量。另外每個(gè)對(duì)應(yīng)一個(gè)連接池,實(shí)現(xiàn)了在級(jí)別的隔離,若下游的某臺(tái)提供服務(wù)的主機(jī)掛了,無效的連接最多只占用該對(duì)應(yīng)的連接池,不會(huì)占用整個(gè)連接池,從而拖垮整個(gè)服務(wù)。

背景

在做服務(wù)化拆分的時(shí)候,若不是性能要求特別高的場(chǎng)景,我們一般對(duì)外暴露Http服務(wù)。Spring里提供了一個(gè)模板類RestTemplate,通過配置RestTemplate,我們可以快速地訪問外部的Http服務(wù)。Http底層是通過Tcp的三次握手建立連接的,若每個(gè)請(qǐng)求都要重新建立連接,那開銷是很大的,特別是對(duì)于消息體非常小的場(chǎng)景,開銷更大。

若使用連接池的方式,來管理連接對(duì)象,能極大地提高服務(wù)的吞吐量。

RestTemplate底層是封裝了HttpClient(筆者的版本是4.3.6),它提供了連接池機(jī)制來處理高并發(fā)網(wǎng)絡(luò)請(qǐng)求。

示例

通常,我們采用如下的樣板代碼來構(gòu)建HttpClient:

    HttpClientBuilder builder = HttpClientBuilder.create();
    builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);
    if (!connectionReuse) {
        builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
    }
    if (!automaticRetry) {
        builder.disableAutomaticRetries();
    }
    if (!compress) {
        builder.disableContentCompression();
    }
    HttpClient httpClient = builder.build();

從上面的代碼可以看出,HttpClient使用建造者設(shè)計(jì)模式來構(gòu)造對(duì)象,最后一行代碼構(gòu)建對(duì)象,前面的代碼是用來設(shè)置客戶端的最大連接數(shù)、單路由最大連接數(shù)、是否使用長連接、壓縮等特性。

源碼分析

我們進(jìn)入HttpClientBuilder的build()方法,會(huì)看到如下代碼:

    # 構(gòu)造Http連接池管理器
    final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
            RegistryBuilder.create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", sslSocketFactory)
                .build());
    if (defaultSocketConfig != null) {
        poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
    }
    if (defaultConnectionConfig != null) {
        poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
    }
    if (systemProperties) {
        String s = System.getProperty("http.keepAlive", "true");
        if ("true".equalsIgnoreCase(s)) {
            s = System.getProperty("http.maxConnections", "5");
            final int max = Integer.parseInt(s);
            poolingmgr.setDefaultMaxPerRoute(max);
            poolingmgr.setMaxTotal(2 * max);
        }
    }
    if (maxConnTotal > 0) {
        poolingmgr.setMaxTotal(maxConnTotal);
    }
    if (maxConnPerRoute > 0) {
        poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
    }
    # Http連接管理器采用連接池的方式實(shí)現(xiàn)
    connManager = poolingmgr;

默認(rèn)情況下構(gòu)造出的Http連接管理器是采用連接池的方式實(shí)現(xiàn)的。

我們進(jìn)入 PoolingHttpClientConnectionManager的代碼,其連接池的核心實(shí)現(xiàn)是依賴于 CPool類,而 CPool又繼承了抽象類AbstractConnPool, AbstractConnPool@ThreadSafe的注解,說明它是線程安全類,所以 HttpClient線程安全地獲取、釋放連接都依賴于 AbstractConnPool。

接下來我來看最核心的AbstractConnPool類,以下是連接池的結(jié)構(gòu)圖:

連接池最重要的兩個(gè)公有方法是 leaserelease,即獲取連接和釋放連接的兩個(gè)方法。

lease 獲取連接
    @Override
    public Future lease(final T route, final Object state, final FutureCallback callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");
        return new PoolEntryFuture(this.lock, callback) {

            @Override
            public E getPoolEntry(
                    final long timeout,
                    final TimeUnit tunit)
                        throws InterruptedException, TimeoutException, IOException {
                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                onLease(entry);
                return entry;
            }

        };
    }

lease方法返回的是一個(gè) Future對(duì)象,即需要調(diào)用 Futureget方法,才可以得到PoolEntry的對(duì)象,它包含了一個(gè)連接的具體信息。

而獲取連接是通過 getPoolEntryBlocking方法實(shí)現(xiàn)的,通過函數(shù)名可以知道,這是一個(gè)阻塞的方法,即該route所對(duì)應(yīng)的連接池中的連接不夠用時(shí),該方法就會(huì)阻塞,直到該 route所對(duì)應(yīng)的連接池有連接釋放,方法才會(huì)被喚醒;或者方法一直等待,直到連接超時(shí)拋出異常。

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture future)
                throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        // 設(shè)置連接超時(shí)時(shí)間戳
        if (timeout > 0) {
            deadline = new Date
                (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        // 獲取連接,并修改修改連接池,所以加鎖--->線程安全
        this.lock.lock();
        try {
            // 從Map中獲取該route對(duì)應(yīng)的連接池,若Map中沒有,則創(chuàng)建該route對(duì)應(yīng)的連接池
            final RouteSpecificPool pool = getPool(route);
            E entry = null;
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    // 獲取 同一狀態(tài)的 空閑連接,即從available鏈表的頭部中移除,添加到leased集合中
                    entry = pool.getFree(state);
                    // 若返回連接為空,跳出循環(huán)
                    if (entry == null) {
                        break;
                    }
                    // 若連接已過期,則關(guān)閉連接
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    if (entry.isClosed()) {
                        // 若該連接已關(guān)閉,則總的available鏈表中刪除該連接
                        this.available.remove(entry);
                        // 從該route對(duì)應(yīng)的連接池的leased集合中刪除該連接,并且不回收到available鏈表中                        
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                // 跳出for循環(huán)
                if (entry != null) {
                    // 若獲取的連接不為空,將連接從總的available鏈表移除,并添加到leased集合中
                    // 獲取連接成功,直接返回
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
                // 計(jì)算該route的最大連接數(shù)
                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                  // 計(jì)算該route連接池中的連接數(shù) 是否 大于等于 route最大連接數(shù)
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                // 若大于等于 route最大連接數(shù),則收縮該route的連接池
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        // 獲取該route連接池中最不常用的空閑連接,即available鏈表末尾的連接
                        // 因?yàn)榛厥者B接時(shí),總是將連接添加到available鏈表的頭部,所以鏈表尾部的連接是最有可能過期的
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        // 關(guān)閉連接,并從總的空閑鏈表以及route對(duì)應(yīng)的連接池中刪除
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
                // 該route的連接池大小 小于 route最大連接數(shù)
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // 總的空閑連接數(shù) 大于等于 總的連接池剩余容量
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                // 從總的available鏈表中 以及 route對(duì)應(yīng)的連接池中 刪除連接,并關(guān)閉連接
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        // 創(chuàng)建新連接,并添加到總的leased集合以及route連接池的leased集合中,函數(shù)返回
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
                
                //route的連接池已滿,無法分配連接
                boolean success = false;
                try {
                    // 將該獲取連接的任務(wù)放入pending隊(duì)列
                    pool.queue(future);
                    this.pending.add(future);
                    // 阻塞等待,若在超時(shí)之前被喚醒,則返回true;若直到超時(shí)才返回,則返回false
                    success = future.await(deadline);
                } finally {
                    // In case of "success", we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we"re shutting down.
                    // Just continue in the loop, both cases are checked.
                    // 無論是 被喚醒返回、超時(shí)返回 還是被 中斷異常返回,都會(huì)進(jìn)入finally代碼段
                    // 從pending隊(duì)列中移除
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                // 判斷是偽喚醒 還是 連接超時(shí)
                // 若是 連接超時(shí),則跳出while循環(huán),并拋出 連接超時(shí)的異常;
                // 若是 偽喚醒,則繼續(xù)循環(huán)獲取連接
                if (!success && (deadline != null) &&
                    (deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            // 釋放鎖
            this.lock.unlock();
        }
    }
release 釋放連接
    @Override
    public void release(final E entry, final boolean reusable) {
        // 獲取鎖
        this.lock.lock();
        try {
            // 從總的leased集合中移除連接
            if (this.leased.remove(entry)) {
                final RouteSpecificPool pool = getPool(entry.getRoute());
                // 回收連接
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                    onRelease(entry);
                } else {
                    entry.close();
                }
                // 獲取pending隊(duì)列隊(duì)頭的任務(wù)(先進(jìn)先出原則),喚醒該阻塞的任務(wù)
                PoolEntryFuture future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    future.wakeup();
                }
            }
        } finally {
            // 釋放鎖
            this.lock.unlock();
        }
    }
總結(jié)

AbstractConnPool其實(shí)就是通過在獲取連接、釋放連接時(shí)加鎖,來實(shí)現(xiàn)線程安全,思路非常簡(jiǎn)單,但它沒有在route對(duì)應(yīng)的連接池中加鎖對(duì)象,即 RouteSpecificPool的獲取連接、釋放連接操作是不加鎖的,因?yàn)橐呀?jīng)在 AbstractConnPool的外部調(diào)用中加鎖,所以是線程安全的,簡(jiǎn)化了設(shè)計(jì)。

另外每個(gè)route對(duì)應(yīng)一個(gè)連接池,實(shí)現(xiàn)了在host級(jí)別的隔離,若下游的某臺(tái)提供服務(wù)的主機(jī)掛了,無效的連接最多只占用該route對(duì)應(yīng)的連接池,不會(huì)占用整個(gè)連接池,從而拖垮整個(gè)服務(wù)。

以上。

原文鏈接

https://segmentfault.com/a/11...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68005.html

相關(guān)文章

  • 記JVM堆外內(nèi)存泄漏Bug查找

    摘要:服務(wù)本身是一個(gè),開起的線程數(shù)為,再加上一些其他線程,總的線程數(shù)不會(huì)超過服務(wù)內(nèi)自己沒有顯示創(chuàng)建線程或者使用線程池。問題解決找到所在后,結(jié)局方案很簡(jiǎn)單,只需將的通過單例的方式注入到服務(wù)中,即可解決堆外內(nèi)存泄漏的問題。 內(nèi)存泄漏Bug現(xiàn)場(chǎng) 一個(gè)做BI數(shù)據(jù)展示的服務(wù)在一個(gè)晚上重啟了5次,由于是通過k8s容器編排,服務(wù)掛了以后會(huì)自動(dòng)重啟,所以服務(wù)還能繼續(xù)提供服務(wù)。 第一時(shí)間先上日志系統(tǒng)查看錯(cuò)誤日...

    hiYoHoo 評(píng)論0 收藏0
  • Apache HttpClient源碼分析連接

    摘要:對(duì)連接數(shù)的管理則有兩個(gè)維度,分別是全局最大數(shù)和單最大數(shù)。當(dāng)請(qǐng)求一個(gè)連接時(shí),會(huì)返回。而會(huì)維護(hù)與及存活時(shí)間等。最終用戶得到的是里封裝而成的連接對(duì)象。連接數(shù)達(dá)到閾值時(shí)對(duì)請(qǐng)求進(jìn)行堵塞,并且將放入。 showImg(https://segmentfault.com/img/bVZW77?w=1217&h=886); 上圖時(shí)連接池類圖關(guān)系。PoolingHttpConnectionManager:...

    YFan 評(píng)論0 收藏0
  • Android網(wǎng)絡(luò)編程2HttpUrlConnection和HttpClient

    摘要:壓縮和緩存機(jī)制可以有效地減少網(wǎng)絡(luò)訪問的流量,在提升速度和省電方面也起到了較大的作用。打開來分析一下,不了解和協(xié)議原理的請(qǐng)查看網(wǎng)絡(luò)編程一協(xié)議原理這篇文章。當(dāng)然這次錯(cuò)誤是正常的,百度沒理由處理我們的這次請(qǐng)求。 前言 上一篇我們了解了HTTP協(xié)議原理,這一篇我們來講講Apache的HttpClient和Java的HttpURLConnection,這兩種都是我們平常請(qǐng)求網(wǎng)絡(luò)會(huì)用到的。無論我們...

    cfanr 評(píng)論0 收藏0
  • 淺析 jdk11 中 HttpClient 使用

    摘要:在中也可以直接使用返回的是,然后通過來獲取結(jié)果阻塞線程,從中獲取結(jié)果四一點(diǎn)嘮叨非常的年輕,網(wǎng)絡(luò)資料不多,且代碼非常精細(xì)和復(fù)雜,目前來看底層應(yīng)該是使用了線程池搭配進(jìn)行異步通訊。 零 前期準(zhǔn)備 0 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 1 HttpClient 簡(jiǎn)介 java.net.http.HttpClient 是 jdk11 中正式...

    Eminjannn 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

gself

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<