成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

PHP 程序員也能做的 Java 開(kāi)發(fā) 30分鐘使用 netty 輕松打造一個(gè)高性能 websock

kviccn / 3297人閱讀

摘要:唯一的知識(shí)點(diǎn)就是的基礎(chǔ)使用。可以簡(jiǎn)單的理解下面的代碼就構(gòu)建了一個(gè)服務(wù)器。握手完成之后的消息傳遞則在中處理。實(shí)際情況下,不可能那么多人同時(shí)說(shuō)話廣播,而是說(shuō)話的人少,接受廣播的人多。

硬廣一波

SF 官方首頁(yè)推薦《PHP進(jìn)階之路》(你又多久沒(méi)有投資自己了?先看后買)

我們下面則將一些實(shí)際場(chǎng)景都添加進(jìn)去,比如用戶身份的驗(yàn)證,游客只能瀏覽不能發(fā)言,多房間(頻道)的聊天。
該博客非常適合 Java 新手,非常適合作為學(xué)習(xí) Java 的切入點(diǎn),不需要考慮tomcat、spring、mybatis等。
唯一的知識(shí)點(diǎn)就是 maven 的基礎(chǔ)使用。

完整的代碼地址

https://github.com/zhoumengka...

├── WebSocketServer.java                啟動(dòng)服務(wù)器端口監(jiān)聽(tīng)
├── WebSocketServerInitializer.java     初始化服務(wù)
├── WebSocketServerHandler.java         接管WebSocket數(shù)據(jù)連接
├── dto
│   └── Response.java                   返回給客戶端數(shù)據(jù)對(duì)象
├── entity
│   └── Client.java                     每個(gè)連接到WebSocket服務(wù)的客戶端對(duì)象
└── service
    ├── MessageService.java             完成發(fā)送消息
    └── RequestService.java             WebSocket初始化連接握手時(shí)的數(shù)據(jù)處理
功能設(shè)計(jì)概述 身份認(rèn)證

客戶端將用戶 id 、進(jìn)入的房間的 rid、用戶 token json_encode,例如{id:1;rid:21;token:"43606811c7305ccc6abb2be116579bfd"}。然后在 base64 處理,通過(guò)參數(shù)request傳到服務(wù)器,然后在服務(wù)器做 id 和 token 的驗(yàn)證(我的做法是 token 存放在redis string 5秒的過(guò)期時(shí)間)

房間表

使用一個(gè)Map channelGroupMap 來(lái)存放各個(gè)房間(頻道),以客戶端傳握手時(shí)傳過(guò)來(lái)的 base64 字符串中獲取到定義的房間 ID,然后為該房間 ID 新建一個(gè)ChannelGroupChannelGroup 方便對(duì)該組內(nèi)的所有客戶端廣播消息)

在 pom.xml 中引入netty 5

現(xiàn)在大家都有自己的包管理工具,不需要實(shí)現(xiàn)下載了然后放到本地lib庫(kù)中,和 nodejs 的 npm, php 的 compser 一樣。


    
        io.netty
        netty-all
        5.0.0.Alpha2
    
    
        com.jcraft
        jzlib
        1.1.2
    
    
        org.json
        json
        20141113
    
    
        commons-codec
        commons-codec
        1.10
    
創(chuàng)建服務(wù)器

這段代碼需要理解嗎?這是 netty 的套路,可以先記住 netty 的線程模型是一個(gè) react 的一種變型,這里有兩個(gè)nio線程組,一個(gè)是接受客戶端的請(qǐng)求,一個(gè)是worker組專門處理客戶端的請(qǐng)求。

可以簡(jiǎn)單的理解下面的代碼就構(gòu)建了一個(gè)nginx服務(wù)器。所以不用管。

package net.mengkang;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


public final class WebSocketServer {

    private static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketServerInitializer());

            Channel ch = b.bind(PORT).sync().channel();
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package net.mengkang;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;


public class WebSocketServerInitializer extends ChannelInitializer {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new WebSocketServerCompressionHandler());
        pipeline.addLast(new WebSocketServerHandler());
    }
}
處理長(zhǎng)連接

下面程序中最的處理在握手階段handleHttpRequest,里面處理參數(shù)的判斷,用戶的認(rèn)證,登錄用戶表的維護(hù),直播房間表維護(hù)。詳細(xì)的請(qǐng)大家對(duì)照代碼來(lái)瀏覽。
握手完成之后的消息傳遞則在handleWebSocketFrame中處理。
整理的執(zhí)行流程,大家可以對(duì)各個(gè)方法打斷點(diǎn)予以調(diào)試,就會(huì)很清楚整個(gè)執(zhí)行的脈絡(luò)啦。

package net.mengkang;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import net.mengkang.dto.Response;
import net.mengkang.entity.Client;
import net.mengkang.service.MessageService;
import net.mengkang.service.RequestService;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler {

    // websocket 服務(wù)的 uri
    private static final String WEBSOCKET_PATH = "/websocket";

    // 一個(gè) ChannelGroup 代表一個(gè)直播頻道
    private static Map channelGroupMap = new ConcurrentHashMap <>();

    // 本次請(qǐng)求的 code
    private static final String HTTP_REQUEST_STRING = "request";

    private Client client = null;

    private WebSocketServerHandshaker handshaker;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Handle a bad request.
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // Allow only GET methods.
        if (req.method() != GET) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }

        if ("/favicon.ico".equals(req.uri()) || ("/".equals(req.uri()))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        Map> parameters = queryStringDecoder.parameters();

        if (parameters.size() == 0 || !parameters.containsKey(HTTP_REQUEST_STRING)) {
            System.err.printf(HTTP_REQUEST_STRING + "參數(shù)不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        client = RequestService.clientRegister(parameters.get(HTTP_REQUEST_STRING).get(0));
        if (client.getRoomId() == 0) {
            System.err.printf("房間號(hào)不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }

        // 房間列表中如果不存在則為該頻道,則新增一個(gè)頻道 ChannelGroup
        if (!channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.put(client.getRoomId(), new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        }
        // 確定有房間號(hào),才將客戶端加入到頻道中
        channelGroupMap.get(client.getRoomId()).add(ctx.channel());

        // Handshake
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            ChannelFuture channelFuture = handshaker.handshake(ctx.channel(), req);

            // 握手成功之后,業(yè)務(wù)邏輯
            if (channelFuture.isSuccess()) {
                if (client.getId() == 0) {
                    System.out.println(ctx.channel() + " 游客");
                    return;
                }

            }
        }
    }

    private void broadcast(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (client.getId() == 0) {
            Response response = new Response(1001, "沒(méi)登錄不能聊天哦");
            String msg = new JSONObject(response).toString();
            ctx.channel().write(new TextWebSocketFrame(msg));
            return;
        }

        String request = ((TextWebSocketFrame) frame).text();
        System.out.println(" 收到 " + ctx.channel() + request);

        Response response = MessageService.sendMessage(client, request);
        String msg = new JSONObject(response).toString();
        if (channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).writeAndFlush(new TextWebSocketFrame(msg));
        }

    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }

        broadcast(ctx, frame);
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
        }

        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("收到" + incoming.remoteAddress() + " 握手請(qǐng)求");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (client != null && channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).remove(ctx.channel());
        }
    }

    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}

服務(wù)器端就寫(xiě)完啦,還有一些客戶端對(duì)象的構(gòu)想驗(yàn)證什么的就不一一細(xì)說(shuō)了,都很簡(jiǎn)單,都在代碼里。下面是客戶端。

客戶端程序




并發(fā)壓測(cè)

同事 https://github.com/ideal 寫(xiě)的壓測(cè)腳本
https://github.com/zhoumengka...
并測(cè)試為N個(gè)客戶端,每個(gè)客戶端發(fā)送10條消息,服務(wù)器配置2核4G內(nèi)存,廣播給所有的客戶端,我們測(cè)試1500個(gè)并發(fā)的時(shí)候,負(fù)載在后期陡升。
實(shí)際情況下,不可能那么多人同時(shí)說(shuō)話廣播,而是說(shuō)話的人少,接受廣播的人多。

實(shí)際線上之后(業(yè)務(wù)遠(yuǎn)比上面的代碼負(fù)載得多的多),在不限制刷帖頻率大家狂轟濫炸的情況下,1500多人在線,半小時(shí),負(fù)載一直都處于0.5以下。

最近老鐵開(kāi)了直播,歡迎來(lái)捧場(chǎng)!

PHP 進(jìn)階之路 - 億級(jí) pv 網(wǎng)站架構(gòu)的技術(shù)細(xì)節(jié)與套路

PHP 進(jìn)階之路 - 億級(jí) pv 網(wǎng)站架構(gòu)實(shí)戰(zhàn)之性能壓榨

PHP 進(jìn)階之路 - 后端多元化之快速切入 Java 開(kāi)發(fā)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/67319.html

相關(guān)文章

  • 三年前舊代碼的重構(gòu)、總結(jié)與反思

    摘要:最近在維護(hù)一個(gè)三年前的舊代碼,用的是框架。單元測(cè)試和語(yǔ)言并發(fā)控制實(shí)際上是個(gè)蛋疼的問(wèn)題,夸張一點(diǎn)說(shuō),當(dāng)時(shí)的并不能特別輕松地實(shí)現(xiàn)并發(fā),甚至不能實(shí)現(xiàn)并發(fā)。語(yǔ)言的功能之一就是自帶單元測(cè)試。用語(yǔ)言之前,我的習(xí)慣是不寫(xiě)單元測(cè)試。 最近在維護(hù)一個(gè)三年前的舊代碼,用的是laravel框架。 從某些方面來(lái)講,這個(gè)代碼算是比較標(biāo)準(zhǔn)為了實(shí)現(xiàn)在規(guī)定的時(shí)間內(nèi)完成相關(guān)功能,同時(shí)程序員水平不高、經(jīng)過(guò)大量?jī)?yōu)化之后,變...

    Shihira 評(píng)論0 收藏0
  • polarphp一個(gè)新的 PHP 語(yǔ)言運(yùn)行時(shí)環(huán)境

    摘要:項(xiàng)目介紹是一個(gè)全新的語(yǔ)言的運(yùn)行時(shí)環(huán)境,基于目前最新的進(jìn)行打造,支持最新的語(yǔ)言規(guī)范,同時(shí)提供了自己的運(yùn)行時(shí)標(biāo)準(zhǔn)庫(kù)。同樣也在的基礎(chǔ)上進(jìn)行打造,實(shí)現(xiàn)了一個(gè)除開(kāi)發(fā)之外的一個(gè)全新的運(yùn)行環(huán)境。發(fā)布核心虛擬機(jī)的鏡像。整合運(yùn)行時(shí)框架。 showImg(https://segmentfault.com/img/bVbnQXK); polarphp 項(xiàng)目介紹 polarphp是一個(gè)全新的PHP語(yǔ)言的運(yùn)行時(shí)...

    宋華 評(píng)論0 收藏0
  • 少啰嗦!一分鐘帶你讀懂Java的NIO和經(jīng)典IO的區(qū)別

    摘要:的選擇器允許單個(gè)線程監(jiān)視多個(gè)輸入通道。一旦執(zhí)行的線程已經(jīng)超過(guò)讀取代碼中的某個(gè)數(shù)據(jù)片段,該線程就不會(huì)在數(shù)據(jù)中向后移動(dòng)通常不會(huì)。 1、引言 很多初涉網(wǎng)絡(luò)編程的程序員,在研究Java NIO(即異步IO)和經(jīng)典IO(也就是常說(shuō)的阻塞式IO)的API時(shí),很快就會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題:我什么時(shí)候應(yīng)該使用經(jīng)典IO,什么時(shí)候應(yīng)該使用NIO? 在本文中,將嘗試用簡(jiǎn)明扼要的文字,闡明Java NIO和經(jīng)典IO之...

    Meils 評(píng)論0 收藏0
  • 資源集 - 收藏集 - 掘金

    摘要:行爬取頂點(diǎn)全網(wǎng)任意小說(shuō)掘金之前連續(xù)多篇文章介紹客戶端爬取平臺(tái),今天我們從零開(kāi)始,實(shí)現(xiàn)爬取頂點(diǎn)小說(shuō)網(wǎng)任意一本小說(shuō)的功能。文件標(biāo)記所有文件我的后端書(shū)架后端掘金我的后端書(shū)架月前本書(shū)架主要針對(duì)后端開(kāi)發(fā)與架構(gòu)。 30行js爬取頂點(diǎn)全網(wǎng)任意小說(shuō) - 掘金之前連續(xù)多篇文章介紹客戶端爬取平臺(tái)(dspider),今天我們從零開(kāi)始,實(shí)現(xiàn)爬取頂點(diǎn)小說(shuō)網(wǎng)任意一本小說(shuō)的功能。 如果你還不知道客戶端爬取,可以先看...

    stdying 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<