成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

ZStack源碼剖析之核心庫鑒賞——EventFacade與CloudBus

teren / 1652人閱讀

摘要:在注冊(cè)了相應(yīng)后,該被調(diào)用后則會(huì)向相應(yīng)的發(fā)送。開始我們的源碼閱讀之旅。得到資源所在的在中,很有可能是集群部署的,每個(gè)管控不同的資源。則是將統(tǒng)一經(jīng)過這個(gè)隊(duì)列,同時(shí)根據(jù)返回給原發(fā)送者。則會(huì)根據(jù)需求改變結(jié)果。

本文首發(fā)于泊浮目的簡(jiǎn)書專欄:https://www.jianshu.com/nb/21...
前言

無論是事件和消息驅(qū)動(dòng),都是解耦的有力手段之一。ZStack作為一個(gè)大型軟件項(xiàng)目,也使用了這些方案對(duì)整個(gè)架構(gòu)進(jìn)行了解耦。

EventFacade

EventFacade是一個(gè)很有意思的組件,因?yàn)樗鼛缀跏亲耘e的。這就意味著有興趣的朋友可以copy and paste,然后稍作修改就可以在自己的項(xiàng)目里工作起來了。

如何使用它

在ZStack的repo中,同樣提供了相應(yīng)的case:

package org.zstack.test.core.cloudbus;
/**
 * Created with IntelliJ IDEA.
 * User: frank
 * Time: 12:38 AM
 * To change this template use File | Settings | File Templates.
 */
public class TestCanonicalEvent {
    CLogger logger = Utils.getLogger(TestCanonicalEvent.class);
    ComponentLoader loader;
    EventFacade evtf;
    boolean success;

    @Before
    public void setUp() throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        evtf = loader.getComponent(EventFacade.class);
        ((EventFacadeImpl) evtf).start();
    }

    @Test
    public void test() throws InterruptedException {
        String path = "/test/event";
        evtf.on(path, new EventRunnable() {
            @Override
            public void run() {
                success = true;
            }
        });

        evtf.fire(path, null);
        TimeUnit.SECONDS.sleep(1);
        Assert.assertTrue(success);
    }
}

使用方法非常簡(jiǎn)單,先注冊(cè)一個(gè)路徑用于接收事件,然后沿著該路徑發(fā)送一個(gè)事件,該事件注冊(cè)的函數(shù)則會(huì)被調(diào)用。

接口定義
package org.zstack.core.cloudbus;

import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: frank
 * Time: 11:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface EventFacade {
    void on(String path, AutoOffEventCallback cb);

    void on(String path, EventCallback cb);

    void on(String path, EventRunnable runnable);

    void off(AbstractEventFacadeCallback cb);

    void onLocal(String path, AutoOffEventCallback cb);

    void onLocal(String path, EventCallback cb);

    void onLocal(String path, EventRunnable runnable);

    void fire(String path, Object data);

    boolean isFromThisManagementNode(Map tokens);

    String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId";
    String META_DATA_PATH = "metadata::path";
    String WEBHOOK_TYPE = "CanonicalEvent";
}
源碼解讀 on
   @Override
    public void on(String path, AutoOffEventCallback cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }

    @Override
    public void on(String path, final EventCallback cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }

    @Override
    public void on(String path, EventRunnable cb) {
        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
    }

on方法僅僅是將一個(gè)屬于EventRunnable 的uuid作為key,并將Callback作為value放入global這個(gè)map中。為什么要這么做呢?因?yàn)樵贛ap的key是不可重復(fù)的,存path肯定是不妥的。

EventFacadeImpl的方法簽名以及成員變量:

public class EventFacadeImpl implements EventFacade, CloudBusEventListener, Component, GlobalApiMessageInterceptor {
    @Autowired
    private CloudBus bus;

    private final Map global = Collections.synchronizedMap(new HashMap<>());
    private final Map local =  new ConcurrentHashMap<>();

    private EventSubscriberReceipt unsubscriber;
fire

相對(duì)的fire方法:

    @Override
    public void fire(String path, Object data) {
        assert path != null;
        CanonicalEvent evt = new CanonicalEvent();
        evt.setPath(path);
        evt.setManagementNodeId(Platform.getManagementServerId());
        if (data != null) {
            /*
            if (!TypeUtils.isPrimitiveOrWrapper(data.getClass()) && !data.getClass().isAnnotationPresent(NeedJsonSchema.class)) {
                throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName()));
            }
            */

            evt.setContent(data);
        }
        //從local這個(gè)map中找到對(duì)應(yīng)的event并調(diào)用
        fireLocal(evt);
        //將事件發(fā)送給對(duì)應(yīng)的webhook
        callWebhooks(evt);
        //通過cloudBus發(fā)送事件,關(guān)于cloudBus的源碼之后會(huì)講到
        bus.publish(evt);
    }
onLocal和on的區(qū)別

在上面的分析中并沒有看到global的event是如何被觸發(fā)的,如果想完全了解其中的過程,還得從CloudBus說起,我們稍后就會(huì)提到它。但是已經(jīng)可以猜到為何要區(qū)分on和onLocal了。一個(gè)是通過消息總線觸發(fā),一個(gè)是在當(dāng)前JVM進(jìn)程內(nèi)觸發(fā)——這意味著一個(gè)支持ManagerNode集群事件,一個(gè)只支持單個(gè)MN事件。這也是來自于ZStack的業(yè)務(wù)場(chǎng)景——有些事情需要MN一起做,有些事情一個(gè)MN做了其他MN就不用做了。介于篇幅,有興趣的讀者可以自行翻看代碼,這里不再詳舉。

WebHook

WebHook是ZStack向前端主動(dòng)通信的手段之一。在注冊(cè)了相應(yīng)EventPath后,該path被調(diào)用后則會(huì)向相應(yīng)的URL發(fā)送content。從case——CanonicalEventWebhookCaseWebhookCase可以看到它的正確使用姿勢(shì)。

CanonicalEventWebhookCase
class CanonicalEventWebhookCase extends SubCase {
    EnvSpec envSpec

    @Override
    void clean() {
        envSpec.delete()
    }

    @Override
    void setup() {
        INCLUDE_CORE_SERVICES = false
        spring {
            include("webhook.xml")
        }
    }

    String WEBHOOK_PATH = "/canonical-event-webhook"

    void testErrorToCreateWebhookifOpaqueFieldMissing() {
        expect(AssertionError.class) {
            createWebhook {
                name = "webhook1"
                url = "http://127.0.0.1:8989$WEBHOOK_PATH"
                type = EventFacade.WEBHOOK_TYPE
            }
        }
    }

    void testCanonicalEventWithVariableInPath() {
        String path = "/test/{uuid}/event"

        int count = 0
        WebhookInventory hook1 = createWebhook {
            name = "webhook1"
            url = "http://127.0.0.1:8989$WEBHOOK_PATH"
            type = EventFacade.WEBHOOK_TYPE
            opaque = path
        }

        // this webhook will not be called because path unmatching
        WebhookInventory hook2 = createWebhook {
            name = "webhook1"
            url = "http://127.0.0.1:8989$WEBHOOK_PATH"
            type = EventFacade.WEBHOOK_TYPE
            opaque = "/this-path-does-not-match"
        }

        CanonicalEvent evt
        envSpec.simulator(WEBHOOK_PATH) { HttpEntity e ->
            evt = json(e.getBody(), CanonicalEvent.class)
            count ++
            return [:]
        }

        String content = "hello world"
        String eventPath = "/test/${Platform.uuid}/event"
        bean(EventFacade.class).fire(eventPath, content)

        retryInSecs {
            assert count == 1
            assert evt != null
            assert evt.path == eventPath
            assert evt.content == content
            assert evt.managementNodeId == Platform.getManagementServerId()
        }
    }

    void testCanonicalEventUseWebhook() {
        String path = "/test/event"

        WebhookInventory hook1 = createWebhook {
            name = "webhook1"
            url = "http://127.0.0.1:8989$WEBHOOK_PATH"
            type = EventFacade.WEBHOOK_TYPE
            opaque = path
        }

        WebhookInventory hook2 = createWebhook {
            name = "webhook2"
            url = "http://127.0.0.1:8989$WEBHOOK_PATH"
            type = EventFacade.WEBHOOK_TYPE
            opaque = path
        }

        def testFireTwoEvents = {
            List evts = []
            envSpec.simulator(WEBHOOK_PATH) { HttpEntity e ->
                CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
                evts.add(evt)
                return [:]
            }

            String content = "hello world"
            bean(EventFacade.class).fire(path, content)

            retryInSecs {
                assert evts.size() == 2
                CanonicalEvent evt1 = evts[0]
                CanonicalEvent evt2 = evts[1]
                assert evt1.path == path
                assert evt1.content == content
                assert evt1.managementNodeId == Platform.getManagementServerId()
                assert evt2.path == path
                assert evt2.content == content
                assert evt2.managementNodeId == Platform.getManagementServerId()
            }
        }

        def testOneEventsGetAfterDeleteOneHook = {
            deleteWebhook { uuid = hook1.uuid }

            List evts = []
            envSpec.simulator(WEBHOOK_PATH) { HttpEntity e ->
                CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
                evts.add(evt)
                return [:]
            }

            String content = "hello world"
            bean(EventFacade.class).fire(path, content)

            retryInSecs {
                assert evts.size() == 1
            }
        }

        def testNoEventGetAfterDeleteAllHooks = {
            deleteWebhook { uuid = hook2.uuid }

            List evts = []
            envSpec.simulator(WEBHOOK_PATH) { HttpEntity e ->
                CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
                evts.add(evt)
                return [:]
            }

            String content = "hello world"
            bean(EventFacade.class).fire(path, content)

            retryInSecs {
                assert evts.size() == 0
            }
        }

        testFireTwoEvents()
        testOneEventsGetAfterDeleteOneHook()
        testNoEventGetAfterDeleteAllHooks()
    }

    @Override
    void environment() {
        envSpec = env {
            // nothing
        }
    }

    @Override
    void test() {
        envSpec.create {
            testCanonicalEventUseWebhook()
            testCanonicalEventWithVariableInPath()
            testErrorToCreateWebhookifOpaqueFieldMissing()
        }
    }
}
WebhookCase
class WebhookCase extends SubCase {
    EnvSpec envSpec

    @Override
    void clean() {
        envSpec.delete()
    }

    @Override
    void setup() {
        INCLUDE_CORE_SERVICES = false
        spring {
            include("webhook.xml")
        }
    }

    @Override
    void environment() {
        envSpec = env {
            // nothing
        }
    }

    void testWebhooksCRUD() {
        WebhookInventory hook = null

        def testCreateWebhook = {
            def params = null

            hook = createWebhook {
                name = "webhook"
                type = "custom-type"
                url = "http://127.0.0.1:8080/webhook"
                description = "desc"
                opaque = "test data"

                params = delegate
            }

            assert dbIsExists(hook.uuid, WebhookVO.class)
            assert hook.name == params.name
            assert hook.type == params.type
            assert hook.url == params.url
            assert hook.description == params.description
            assert hook.opaque == params.opaque
        }

        def testQueryWebhook = {
            List invs = queryWebhook {
                conditions = ["name=${hook.name}"]
            }

            assert invs.size() == 1
            assert invs[0].uuid == hook.uuid
        }

        def testDeleteWebhook = {
            deleteWebhook {
                uuid = hook.uuid
            }

            assert !dbIsExists(hook.uuid, WebhookVO.class)
        }

        testCreateWebhook()
        testQueryWebhook()
        testDeleteWebhook()
    }

    void testInvalidUrl() {
        expect(AssertionError.class) {
            createWebhook {
                name = "webhook"
                type = "custom-type"
                url = "this is not a url"
                description = "desc"
                opaque = "test data"
            }
        }
    }

    @Override
    void test() {
        envSpec.create {
            testWebhooksCRUD()
            testInvalidUrl()
        }
    }
}
CloudBus

CloudBus可以說是ZStack中最重要的組件了,ZStack各個(gè)模塊的通信全部是由Message來完成的,而CloudBus就是它們的通信媒介,接下來我們來看它的源碼。

本節(jié)適合對(duì)AMQP有一定了解同學(xué),如果不了解可以先看我的博客MQ學(xué)習(xí)小記
如何使用它

先看一個(gè)相關(guān)的Case:

package org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.TimeUnit;

public class TestCloudBusCall {
    CLogger logger = Utils.getLogger(TestCloudBusCall.class);
    ComponentLoader loader;
    CloudBusIN bus;
    Service serv;

    public static class HelloWorldMsg extends NeedReplyMessage {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }

    }

    public static class HelloWorldReply extends MessageReply {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }
    }

    class FakeService extends AbstractService {
        @Override
        public boolean start() {
            bus.registerService(this);
            bus.activeService(this);
            return true;
        }

        @Override
        public boolean stop() {
            bus.deActiveService(this);
            bus.unregisterService(this);
            return true;
        }

        @Override
        public void handleMessage(Message msg) {
            if (msg.getClass() == HelloWorldMsg.class) {
                HelloWorldMsg hmsg = (HelloWorldMsg) msg;
                HelloWorldReply r = new HelloWorldReply();
                r.setGreet(hmsg.getGreet());
                bus.reply(msg, r);
            }
        }

        @Override
        public String getId() {
            return this.getClass().getCanonicalName();
        }

    }

    @Before
    public void setUp() throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        bus = loader.getComponent(CloudBusIN.class);
        serv = new FakeService();
        serv.start();
    }

    @Test
    public void test() throws InterruptedException, ClassNotFoundException {
        HelloWorldMsg msg = new HelloWorldMsg();
        msg.setGreet("Hello");
        msg.setServiceId(FakeService.class.getCanonicalName());
        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
        HelloWorldReply r = (HelloWorldReply) bus.call(msg);
        serv.stop();
        Assert.assertEquals("Hello", r.getGreet());
    }
}
我們注冊(cè)了一個(gè)Service,并覆寫HandleMessage方法,在Case中,我們成功收到了消息并通過了斷言。


再看一個(gè):

package org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class TestCloudBusSendCallback {

CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class);
ComponentLoader loader;
CloudBusIN bus;
CountDownLatch latch = new CountDownLatch(1);
boolean isSuccess = false;
Service serv;

public static class HelloWorldMsg extends NeedReplyMessage {
    private String greet;

    public String getGreet() {
        return greet;
    }

    public void setGreet(String greet) {
        this.greet = greet;
    }

}

public static class HelloWorldReply extends MessageReply {
    private String greet;

    public String getGreet() {
        return greet;
    }

    public void setGreet(String greet) {
        this.greet = greet;
    }
}

class FakeService extends AbstractService {
    @Override
    public boolean start() {
        bus.registerService(this);
        bus.activeService(this);
        return true;
    }

    @Override
    public boolean stop() {
        bus.deActiveService(this);
        bus.unregisterService(this);
        return true;
    }

    @Override
    public void handleMessage(Message msg) {
        if (msg.getClass() == HelloWorldMsg.class) {
            HelloWorldMsg hmsg = (HelloWorldMsg) msg;
            HelloWorldReply r = new HelloWorldReply();
            r.setGreet(hmsg.getGreet());
            bus.reply(msg, r);
        }
    }

    @Override
    public String getId() {
        return this.getClass().getCanonicalName();
    }

}

@Before
public void setUp() throws Exception {
    BeanConstructor con = new BeanConstructor();
    loader = con.build();
    bus = loader.getComponent(CloudBusIN.class);
    serv = new FakeService();
    serv.start();
}

@Test
public void test() throws InterruptedException, ClassNotFoundException {
    HelloWorldMsg msg = new HelloWorldMsg();
    msg.setGreet("Hello");
    msg.setServiceId(FakeService.class.getCanonicalName());
    msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
    bus.send(msg, new CloudBusCallBack(null) {
        @Override
        public void run(MessageReply reply) {
            if (reply instanceof HelloWorldReply) {
                HelloWorldReply hr = (HelloWorldReply) reply;
                if ("Hello".equals(hr.getGreet())) {
                    isSuccess = true;
                }
            }
            latch.countDown();
        }
    });
    latch.await(15, TimeUnit.SECONDS);
    serv.stop();
    Assert.assertEquals(true, isSuccess);
}

}

同樣也是注冊(cè)了一個(gè)Service,然后使用了CallBack,如果運(yùn)行一下發(fā)現(xiàn)斷言是可以通過的——意味著CallBack有被調(diào)用。

綜上,使用CloudBus很簡(jiǎn)單——只需要注冊(cè)你的Service,使用CloudBus指定Service發(fā)送,Service就能收到,如果你需要注冊(cè)你的CallBack,也能很簡(jiǎn)單完成。
接口定義

這么好用的東西,內(nèi)部實(shí)現(xiàn)恐怕不會(huì)太簡(jiǎn)單。我們先從接口開始看:

package org.zstack.core.cloudbus;

import org.zstack.header.Component;
import org.zstack.header.Service;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudConfigureFailException;
import org.zstack.header.message.*;

import java.util.List;

public interface CloudBus extends Component {
    void send(Message msg);
    
     void send(List msgs);
    
    void send(NeedReplyMessage msg, CloudBusCallBack callback);

    @Deprecated
    void send(List msgs, CloudBusListCallBack callBack);

    @Deprecated
    void send(List msgs, int parallelLevel, CloudBusListCallBack callBack);

    @Deprecated
    void send(List msgs, int parallelLevel, CloudBusSteppingCallback callback);

    void route(List msgs);
    
    void route(Message msg);
    
    void reply(Message request, MessageReply reply);
    
    void publish(List events);
    
    void publish(Event event);
    
    MessageReply call(NeedReplyMessage msg);
    
     List call(List msg);
    
    void registerService(Service serv) throws CloudConfigureFailException;
    
    void unregisterService(Service serv);
    
    EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event...events);
    
    void dealWithUnknownMessage(Message msg);
    
    void replyErrorByMessageType(Message msg, Exception e);
    
    void replyErrorByMessageType(Message msg, String err);
    
    void replyErrorByMessageType(Message msg, ErrorCode err);
    
    void logExceptionWithMessageDump(Message msg, Throwable e);
    
    String makeLocalServiceId(String serviceId);

    void makeLocalServiceId(Message msg, String serviceId);

    String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId);

    void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId);

    String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid);

    void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid);

    void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class...classes);

    void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class...classes);

    void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class...classes);
}

接口的命名語義較為清晰,在這里不多做解釋。開始我們的源碼閱讀之旅。

源碼解讀 CloudBus在ZStack Starting的時(shí)候做了什么? init

init是在bean處于加載器,Spring提供的一個(gè)鉤子。在xml中我們可以看到聲明:




    

    
        
            
        
    

    
        
        
        
    
    
    
    
    
    
        
            
        
    

init方法:

    void init() {
        trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER;
        serverIps = CloudBusGlobalProperty.SERVER_IPS;
        tracker = new MessageTracker();

        ConnectionFactory connFactory = new ConnectionFactory();
        List
addresses = CollectionUtils.transformToList(serverIps, new Function() { @Override public Address call(String arg) { return Address.parseAddress(arg); } }); connFactory.setAutomaticRecoveryEnabled(true); connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT); connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL)); connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT)); logger.info(String.format("use RabbitMQ server IPs: %s", serverIps)); try { if (CloudBusGlobalProperty.RABBITMQ_USERNAME != null) { connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME); logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME)); } if (CloudBusGlobalProperty.RABBITMQ_PASSWORD != null) { connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD); logger.info("use RabbitMQ password: ******"); } if (CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST != null) { connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST); logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST)); } conn = connFactory.newConnection(addresses.toArray(new Address[]{})); logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress())); ((Recoverable)conn).addRecoveryListener(new RecoveryListener() { @Override public void handleRecovery(Recoverable recoverable) { logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString())); } }); channelPool = new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn); createExchanges(); outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P); Channel chan = channelPool.acquire(); chan.queueDeclare(outboundQueue.getName(), false, false, true, queueArguments()); chan.basicConsume(outboundQueue.getName(), true, consumer); chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey()); channelPool.returnChannel(chan); maid.construct(); noRouteEndPoint.construct(); tracker.construct(); tracker.trackService(SERVICE_ID); } catch (Exception e) { throw new CloudRuntimeException(e); } }

簡(jiǎn)單來說,該函數(shù)嘗試獲取配置文件中與RabbitMQ中相關(guān)的配置,并初始化Connection,并以此為基礎(chǔ)創(chuàng)建了channel poll。然后將一個(gè)channel和一個(gè)messageQueue綁定在了一起。同時(shí)構(gòu)造了EventMaid和noRouteEndPoint和tracker,后二者都是Message的消費(fèi)者,看名字就可以看出來,一個(gè)用于訂閱/發(fā)布模型(綁定此交換器的隊(duì)列都會(huì)收到消息),一個(gè)用于track。

start

start則是ZStack定義的一個(gè)鉤子,當(dāng)ManagerNode起來的時(shí)候,start會(huì)被調(diào)用到。

   @Override
    public boolean start() {
        populateExtension();
        prepareStatistics();

        for (Service serv : services) {
            assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName());
            registerService(serv);
        }

        jmxf.registerBean("CloudBus", this);

        return true;
    }

一個(gè)個(gè)看:

    private void populateExtension() {
        services = pluginRgty.getExtensionList(Service.class);
        for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) {
            List clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint();
            if (clazzs == null || clazzs.isEmpty()) {
                continue;
            }

            for (Class clz : clazzs) {
                if (!(APIEvent.class.isAssignableFrom(clz)) && !(MessageReply.class.isAssignableFrom(clz))) {
                    throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply",
                            clz.getName(), extp.getClass().getName()));
                }

                List exts = replyMessageMarshaller.get(clz);
                if (exts == null) {
                    exts = new ArrayList();
                    replyMessageMarshaller.put(clz, exts);
                }
                exts.add(extp);
            }
        }
    }

首先收集了所有繼承于Service的類,然后加載會(huì)改變msg reply的extensionPoint。

 private void prepareStatistics() {
        List needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class);
        needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function() {
            @Override
            public Class call(Class arg) {
                return !APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg : null;
            }
        });

        for (Class clz : needReplyMsgs) {
            MessageStatistic stat = new MessageStatistic();
            stat.setMessageClassName(clz.getName());
            statistics.put(stat.getMessageClassName(), stat);
        }
    }

為需要回復(fù)的msg設(shè)置統(tǒng)計(jì)信息。

之后就是把所有的Service收集起來,方便Msg的分發(fā)。

常用方法 CloudBus.makeLocalServiceId
    @Override
    public String makeLocalServiceId(String serviceId) {
        return serviceId + "." + Platform.getManagementServerId();
    }

    @Override
    public void makeLocalServiceId(Message msg, String serviceId) {
        msg.setServiceId(makeLocalServiceId(serviceId));
    }

如ZStack的伸縮性秘密武器:無狀態(tài)服務(wù)中所說一般,每個(gè)管理節(jié)點(diǎn)都會(huì)注冊(cè)一堆服務(wù)隊(duì)列。因此我們要按照其格式組裝,這樣消息才能被服務(wù)所接收。

CloudBus.makeTargetServiceIdByResourceUuid
    @Override
    public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) {
        DebugUtils.Assert(serviceId!=null, "serviceId cannot be null");
        DebugUtils.Assert(resourceUuid!=null, "resourceUuid cannot be null");
        //得到資源所在的MN UUID
        String mgmtUuid = destMaker.makeDestination(resourceUuid);
        return serviceId + "." + mgmtUuid;
    }

    @Override
    public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) {
        String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid);
        msg.setServiceId(targetService);
    }

在ZStack中,ManagerNode很有可能是集群部署的,每個(gè)MN管控不同的資源。那么就需要一致性哈希環(huán)來確定資源所在哪個(gè)MN。

CloudBus.send
    @Override
    public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
        //給msg一個(gè)超時(shí)時(shí)間
        evaluateMessageTimeout(msg);
        //new繼承于Envelope的匿名內(nèi)部類
        Envelope e = new Envelope() {
            //用來判斷這個(gè)msg是否已經(jīng)發(fā)出去了
            AtomicBoolean called = new AtomicBoolean(false);

            final Envelope self = this;
            //計(jì)算超時(shí),往線程池提交一個(gè)任務(wù)
            TimeoutTaskReceipt timeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() {
                @Override
                public void run() {
                    self.timeout();
                }
            }, TimeUnit.MILLISECONDS, msg.getTimeout());

            @Override
            //msg 發(fā)送成功時(shí)候調(diào)用這個(gè)方法
            public void ack(MessageReply reply) {
                //計(jì)算該msg耗時(shí)
                count(msg);
                //根據(jù)msg的唯一UUID移除在這個(gè)map中的記錄
                envelopes.remove(msg.getId());
                //如果更新失敗,說明這個(gè)消息已經(jīng)被發(fā)送過了。返回
                if (!called.compareAndSet(false, true)) {
                    return;
                }
                //取消一個(gè)計(jì)算超時(shí)的任務(wù)
                timeoutTaskReceipt.cancel();
                //調(diào)用注冊(cè)的callback
                callback.run(reply);
            }

            //消息超時(shí)時(shí)調(diào)用的邏輯
            @Override
            public void timeout() {
                // 根據(jù)msg的唯一UUID移除在這個(gè)map中的記錄
                envelopes.remove(msg.getId());
                 // 如何已經(jīng)被調(diào)用過則返回
                if (!called.compareAndSet(false, true)) {
                    return;
                }
                //內(nèi)部構(gòu)造一個(gè)超時(shí)reply返回給callback
                callback.run(createTimeoutReply(msg));
            }
            //用于getWaitingReplyMessageStatistic
            @Override
            List getRequests() {
                List requests = new ArrayList();
                requests.add(msg);
                return requests;
            }
        };
        
        //往envelopes這個(gè)map里放入msg的唯一UUID和剛剛構(gòu)造的envelope
        envelopes.put(msg.getId(), e);
        //發(fā)送消息
        send(msg, false);
    }
私有方法:send
    private void send(Message msg, Boolean noNeedReply) {
        //msg的serviceID不允許為空,不然不能
        if (msg.getServiceId() == null) {
            throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
        }
        //為msg構(gòu)建基本屬性
        basicProperty(msg);
        //設(shè)置msg header屬性
        msg.putHeaderEntry(CORRELATION_ID, msg.getId());
        //消息的回復(fù)隊(duì)列設(shè)置
        msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey());
        if (msg instanceof APIMessage) {
            // API always need reply
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString());
        } else if (msg instanceof NeedReplyMessage) {
            // for NeedReplyMessage sent without requiring receiver to reply,
            // mark it, then it will not be tracked and replied
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
        }

        buildRequestMessageMetaData(msg);
        wire.send(msg);
    }

該函數(shù)是一段公用邏輯。所有的消息都是從這里進(jìn)去然后由rabbitMQ發(fā)出去的。所以在這里需要多說幾句。

    protected void basicProperty(Message msg) {
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build());
    }

這個(gè)函數(shù)設(shè)置了msg基礎(chǔ)屬性——持久化策略(否)和超時(shí)。

那么再看buildRequestMessageMetaData方法

    private void buildRequestMessageMetaData(Message msg) {
        if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && !Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) {
            RequestMessageMetaData metaData;
            if (msg instanceof LockResourceMessage) {
                LockResourceMessage lmsg = (LockResourceMessage) msg;
                LockMessageMetaData lmetaData = new LockMessageMetaData();
                lmetaData.unlockKey = lmsg.getUnlockKey();
                lmetaData.reason = lmsg.getReason();
                lmetaData.senderManagementUuid = Platform.getManagementServerId();
                metaData = lmetaData;
            } else {
                metaData = new RequestMessageMetaData();
            }

            metaData.needApiEvent = msg instanceof APIMessage && !(msg instanceof APISyncCallMessage);
            metaData.msgId = msg.getId();
            metaData.replyTo = msg.getHeaderEntry(REPLY_TO);
            metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null;
            metaData.serviceId = msg.getServiceId();
            metaData.messageName = msg.getClass().getName();
            metaData.className = metaData.getClass().getName();
            msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData));
        }
    }

方法buildRequestMessageMetaData將消息所需的metaData從msg里取了出來并放入了msg的真正Header中。

然后是wire.send:

        public void send(Message msg) {
            // for unit test finding invocation chain
            MessageCommandRecorder.record(msg.getClass());

            List interceptors = beforeSendMessageInterceptors.get(msg.getClass());
            if (interceptors != null) {
                for (BeforeSendMessageInterceptor interceptor : interceptors) {
                    interceptor.intercept(msg);

                    /*
                    if (logger.isTraceEnabled()) {
                        logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
                    }
                    */
                }
            }

            for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) {
                interceptor.intercept(msg);

                /*
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
                }
                */
            }

            send(msg, true);
        }

邏輯一目了然:

記錄它的調(diào)用鏈

調(diào)用它專屬的發(fā)送前攔截器進(jìn)行攔截

調(diào)用所有msg的發(fā)送前攔截器進(jìn)行攔截

send(msg, true);:

       public void send(final Message msg, boolean makeQueueName) {
            /*
            StopWatch watch = new StopWatch();
            watch.start();
            */
            String serviceId = msg.getServiceId();
            if (makeQueueName) { 
                //獲取真正的隊(duì)列名
                serviceId = makeMessageQueueName(serviceId);
            }
            // build json schema
            buildSchema(msg);
            //當(dāng)前的thread Context中獲取必要信息。每個(gè)api調(diào)用所攜帶的uuid就是這樣傳遞下去的
            evalThreadContextToMessage(msg);

            if (logger.isTraceEnabled() && logMessage(msg)) {
                logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg)));
            }

            //從channel poll 中取出一個(gè)channel 
            Channel chan = channelPool.acquire();
            try {
                //接下來多帶帶解釋
                new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
                /*
                watch.stop();
                logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime()));
                */
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                //返回給channel poll
                channelPool.returnChannel(chan);
            }
        }

多帶帶分析 new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();

        private class RecoverableSend {
            Channel chan;
            byte[] data;
            String serviceId;
            Message msg;
            BusExchange exchange;

            RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException {
                data = compressMessageIfNeeded(msg);
                this.chan = chan;
                this.serviceId = serviceId;
                this.msg = msg;
                this.exchange = exchange;
            }

            void send() throws IOException {
                try {
                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);
                } catch (ShutdownSignalException e) {
                    if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {
                        // the connection is not recoverable
                        throw e;
                    }

                    logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
                            "we are doing recoverable send right now", e.getMessage()));

                    if (!recoverSend()) {
                        throw e;
                    }
                }
            }

            private byte[] compressMessageIfNeeded(Message msg) throws IOException {
                if (!CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msg instanceof APIEvent || msg instanceof APIMessage) {
                    return gson.toJson(msg, Message.class).getBytes();
                }

                msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true");
                return Compresser.deflate(gson.toJson(msg, Message.class).getBytes());
            }

            private boolean recoverSend() throws IOException {
                int interval = conn.getHeartbeat() / 2;
                interval = interval > 0 ? interval : 1;
                int count = 0;

                // as the connection is lost, there is no need to wait heart beat missing 8 times
                // so we use reflection to fast the process
                RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn);
                DebugUtils.Assert(delegate != null, "cannot get RecoveryAwareAMQConnection");
                Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class);
                DebugUtils.Assert(_missedHeartbeats!=null, "cannot find _missedHeartbeats");
                _missedHeartbeats.setAccessible(true);
                try {
                    _missedHeartbeats.set(delegate, 100);
                } catch (IllegalAccessException e) {
                    throw new CloudRuntimeException(e);
                }

                while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) {
                    try {
                        TimeUnit.SECONDS.sleep(interval);
                    } catch (InterruptedException e1) {
                        logger.warn(e1.getMessage());
                    }

                    try {
                        chan.basicPublish(exchange.toString(), serviceId,
                                true, msg.getAMQPProperties(), data);
                        return true;
                    } catch (ShutdownSignalException e) {
                        logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s",
                                count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage()));
                        count ++;
                    }
                }

                return false;
            }
        }

最核心的代碼即是:

                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);

根據(jù)交換器、綁定器的key和msg的基本屬性還有已經(jīng)序列化的msg在RabbitMQ中發(fā)送消息。

我們可以看一下該方法簽名:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the "mandatory" flag is to be set
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)自身類型和消息routeKey無法找到一個(gè)符合條件的queue,那么會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者;當(dāng)mandatory設(shè)為false時(shí),出現(xiàn)上述情形broker會(huì)直接將消息扔掉。

還有一個(gè)附有immediate的方法:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the "mandatory" flag is to be set
     * @param immediate true if the "immediate" flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;

當(dāng)immediate標(biāo)志位設(shè)置為true時(shí),如果exchange在將消息route到queue(s)時(shí)發(fā)現(xiàn)對(duì)應(yīng)的queue上沒有消費(fèi)者,那么這條消息不會(huì)放入隊(duì)列中。當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個(gè)或多個(gè))都沒有消費(fèi)者時(shí),該消息會(huì)通過basic.return方法返還給生產(chǎn)者。

CloudBus.reply
    @Override
    public void reply(Message request, MessageReply reply) {
        if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG,
                        wire.dumpMessage(request), wire.dumpMessage(reply)));
            }

            return;
        }

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        reply.setAMQPProperties(builder.deliveryMode(1).build());
        reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString());
        reply.putHeaderEntry(CORRELATION_ID, request.getId());
        reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));

        buildResponseMessageMetaData(reply);
        if (request instanceof NeedReplyMessage) {
            callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
        }
        wire.send(reply, false);
    }

其他屬性之前都有提到。 reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));則是將reply統(tǒng)一經(jīng)過outboundQueue這個(gè)隊(duì)列,同時(shí)根據(jù)correlationId返回給原發(fā)送者。

callReplyPreSendingExtensions則會(huì)根據(jù)需求改變r(jià)eply結(jié)果。之后就是wire.send,之前已經(jīng)分析過了。

CloudBus.publish
    @Override
    public void publish(Event event) {
        if (event instanceof APIEvent) {
            APIEvent aevt = (APIEvent) event;
            DebugUtils.Assert(aevt.getApiId() != null, String.format("apiId of %s cannot be null", aevt.getClass().getName()));
        }
        //和前面的msgProperty一樣
        eventProperty(event);
        //構(gòu)建metaData
        buildResponseMessageMetaData(event);
        //前面分析過了
        callReplyPreSendingExtensions(event, null);
        //調(diào)用beforeEventPublishInterceptors。為了拋出異常的時(shí)候方便track,聲明了這樣的一個(gè)變量。
        BeforePublishEventInterceptor c = null;
        try {
            List is = beforeEventPublishInterceptors.get(event.getClass());
            if (is != null) {
                for (BeforePublishEventInterceptor i : is) {
                    c = i;
                    i.beforePublishEvent(event);
                }
            }

            for (BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll)  {
                c = i;
                i.beforePublishEvent(event);
            }
        } catch (StopRoutingException e) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s",
                        c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event)));
            }

            return;
        }

        wire.publish(event);
    }

接下來看wire.publish方法

        public void publish(Event evt) {
            /*
            StopWatch watch = new StopWatch();
            watch.start();
            */

            buildSchema(evt);

            evalThreadContextToMessage(evt);

            if (logger.isTraceEnabled() && logMessage(evt)) {
                logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt)));
            }

            Channel chan = channelPool.acquire();
            try {
                new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send();
                /*
                watch.stop();
                logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime()));
                */
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                channelPool.returnChannel(chan);
            }
        }

大部分方法和send無異。但是在Event的類中定義了兩種Type:

package org.zstack.header.message;

import org.zstack.header.rest.APINoSee;

public abstract class Event extends Message {
    /**
     * @ignore
     */
    @APINoSee
    private String avoidKey;

    public String getAvoidKey() {
        return avoidKey;
    }

    public void setAvoidKey(String avoidKey) {
        this.avoidKey = avoidKey;
    }

    public abstract Type getType();

    public abstract String getSubCategory();

    public static final String BINDING_KEY_PERFIX = "key.event.";

    public static enum Category {
        LOCAL,
        API,
    }

    public static class Type {
        private final String _name;

        public Type(Category ctg, String subCtg) {
            _name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg;
        }

        @Override
        public String toString() {
            return _name;
        }

        @Override
        public int hashCode() {
            return _name.hashCode();
        }

        @Override
        public boolean equals(Object t) {
            if (!(t instanceof Type)) {
                return false;
            }

            Type type = (Type) t;
            return _name.equals(type.toString());
        }
    }
}

即Local和API。從名字上很好看出來,一個(gè)用來回復(fù)APIMsg的,一個(gè)用來發(fā)布本地消息。不過要了解這里面的細(xì)節(jié),就得看EventMaid了。

EventMaid
    private class EventMaid extends AbstractConsumer {
        Map> listeners = new ConcurrentHashMap>();
        Channel eventChan;
        String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid()));

        public void construct() {
            try {
                eventChan = conn.createChannel();
                eventChan.queueDeclare(queueName, false, false, true, queueArguments());
                eventChan.basicConsume(queueName, true, this);
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        public void destruct() {
            try {
                eventChan.close();
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }


        public void listen(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List lst = listeners.get(type);
                    if (lst == null) {
                        lst = new CopyOnWriteArrayList();
                        listeners.put(type, lst);
                        eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[listening event]: %s", type));
                    }

                    if (!lst.contains(l)) {
                        lst.add(l);
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        public void unlisten(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List lst = listeners.get(type);
                    if (lst == null) {
                        return;
                    }

                    lst.remove(l);
                    if (lst.isEmpty()) {
                        listeners.remove(type);
                        eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[unlistening event]: %s", type));
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        @SyncThread(level = 10)
        @MessageSafe
        private void dispatch(Event evt, EventListenerWrapper l) {
            setThreadLoggingContext(evt);

            l.callEventListener(evt);
        }


        private void handle(Event evt) {
            String type = evt.getType().toString();
            List lst = listeners.get(type);
            if (lst == null) {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
            }

            for (EventListenerWrapper l : lst) {
                dispatch(evt, l);
            }
        }

        @Override
        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
            Event evt = null;
            try {
                evt = (Event) wire.toMessage(bytes, basicProperties);
                handle(evt);
            } catch (final Throwable t) {
                final Event fevt = evt;
                throwableSafe(new Runnable() {
                    @Override
                    public void run() {
                        if (fevt != null) {
                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
                        } else {
                            logger.warn(String.format("unhandled throwable"), t);
                        }
                    }
                });
            }
        }
    }

這段代碼得先從handleDelivery開始看:

        @Override
        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
            Event evt = null;
            try {
                evt = (Event) wire.toMessage(bytes, basicProperties);
                handle(evt);
            } catch (final Throwable t) {
                final Event fevt = evt;
                throwableSafe(new Runnable() {
                    @Override
                    public void run() {
                        if (fevt != null) {
                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
                        } else {
                            logger.warn(String.format("unhandled throwable"), t);
                        }
                    }
                });
            }
        }

可以看到,這里是重載了Consumer接口的handleDelivery,我們看一下它的方法注釋:

   /**
     * Called when a basic.deliver is received for this consumer.
     * @param consumerTag the consumer tag associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;

這樣保證EventMaid的對(duì)象能夠接收到Msg。在try代碼塊中,從byte轉(zhuǎn)換出了Event,然后走向了handle邏輯。

        private void handle(Event evt) {
            //前面提過,有兩種Type,API和Local
            String type = evt.getType().toString();
            //所以只會(huì)取出兩種List
            List lst = listeners.get(type);
            if (lst == null) {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
            }

            for (EventListenerWrapper l : lst) {
                //跳到下一個(gè)邏輯
                dispatch(evt, l);
            }
        }
        @SyncThread(level = 10)
        @MessageSafe
        private void dispatch(Event evt, EventListenerWrapper l) {
            setThreadLoggingContext(evt);
            //跳至下一段邏輯
            l.callEventListener(evt);
        }
    @Override
    public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) {
        final EventListenerWrapper wrapper = new EventListenerWrapper() {
            @Override
            public void callEventListener(Event e) {
                //走到各自的handle邏輯,如果返回true則unlisten
                if (listener.handleEvent(e)) {
                    maid.unlisten(e, this);
                }
            }
        };
        // 一個(gè)event對(duì)應(yīng)一個(gè)ListenWrapper
        for (Event e : events) {
            maid.listen(e, wrapper);
        }

        return new EventSubscriberReceipt() {
            @Override
            public void unsubscribe(Event e) {
                maid.unlisten(e, wrapper);
            }

            @Override
            public void unsubscribeAll() {
                for (Event e : events) {
                    maid.unlisten(e, wrapper);
                }
            }
        };
    }

再看listen:

        public void listen(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List lst = listeners.get(type);
                    if (lst == null) {
                        lst = new CopyOnWriteArrayList();
                        listeners.put(type, lst);
                        eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[listening event]: %s", type));
                    }

                    if (!lst.contains(l)) {
                        lst.add(l);
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

首先加鎖了listeners這個(gè)put,并根據(jù)type取出相應(yīng)的list。同時(shí)將這個(gè)list轉(zhuǎn)換為CopyOnWriteArrayList,這樣這個(gè)list的引用就不會(huì)泄露出去了。然后綁定一個(gè)channel作為通道。另外,如果EventListenerWrapper List中不存在提交的EventListenerWrapper,則添加進(jìn)去。

相信講了這么多,有一部分讀者可能已經(jīng)繞暈了。這邊寫一個(gè)關(guān)于EventMaid的邏輯調(diào)用小結(jié):

在ZStack的每個(gè)Component啟動(dòng)時(shí),會(huì)向CloudBus訂閱event。

當(dāng)CloudBus收到需要publish的event,會(huì)向所有實(shí)現(xiàn)CloudBusEventListener接口的對(duì)象發(fā)送事件,由他們自己選擇是否處理這些事件。

CloudBus和EventFascade就是這樣協(xié)同工作的。
小結(jié)

在本文,我們一起瀏覽的ZStack中提供消息驅(qū)動(dòng)特性組件的源碼——顯然,這兩個(gè)組件的API非常好用,簡(jiǎn)潔明了。但在具體邏輯中有幾個(gè)可以改進(jìn)的點(diǎn):

handleEvent返回boolean的判斷為ture則取消listen,語義上不是很好理解

listen方法中的listeners可以用并發(fā)容器——ConcurrentHashMap代替,以增加吞吐量。

listeners的v完全可以用Set來代替。CopyOnWriteArrayList也可以用CopyOnWriteArraySet來代替。我們?cè)趌isten方法中可以看到,如果lst不包含l,則add。這說明lst是不應(yīng)該重復(fù)的。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/68521.html

相關(guān)文章

  • ZStack源碼剖析設(shè)計(jì)模式鑒賞——策略模式

    摘要:能夠整體地替換算法,能讓我們輕松地以不同的算法去解決一個(gè)問題,這種模式就是模式。這個(gè)類是在發(fā)布前常在中被使用的一個(gè)類,代碼如下以為例,從語義上來說就是為了中的每個(gè)元素調(diào)用函數(shù)。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 無論什么程序,其目的都是解決問題。而為了解決問題,我們又需要編寫特定的算法。使用Strategy模式可以整體地替...

    Eric 評(píng)論0 收藏0
  • ZStack源碼剖析模塊鑒賞——LongJob

    摘要:因?yàn)檫@個(gè)狀態(tài)下,是交給一個(gè)線程在執(zhí)行的,見源碼剖析之核心庫鑒賞中的分析。并且允許等行為。上面提到過,允許運(yùn)行暫停取消等行為。維護(hù)和相應(yīng)的之間的關(guān)系。則停止執(zhí)行并觸發(fā)之前的所有。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,當(dāng)用戶在UI上發(fā)起操作時(shí),前端會(huì)調(diào)用后端的API對(duì)實(shí)際的資源發(fā)起操作請(qǐng)求。但在一個(gè)分布式系統(tǒng)中...

    cheukyin 評(píng)論0 收藏0
  • ZStack源碼剖析核心鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    DevWiki 評(píng)論0 收藏0
  • ZStack源碼剖析核心鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    ymyang 評(píng)論0 收藏0
  • ZStack源碼剖析核心鑒賞——ThreadFacade

    摘要:每個(gè)消息都會(huì)被一個(gè)線程消費(fèi),同時(shí)最大并發(fā)量為。然后提交一個(gè)任務(wù)到線程池中,這個(gè)任務(wù)的內(nèi)容是從等待隊(duì)列中取出一個(gè),如果等待隊(duì)列為空,則刪除這個(gè)等待隊(duì)列的。小結(jié)本文分析了的久經(jīng)生產(chǎn)考驗(yàn)的核心組件線程池。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,最基本的執(zhí)行單位不僅僅是一個(gè)函數(shù),也可以是一個(gè)任務(wù)(Task。其本質(zhì)實(shí)現(xiàn)...

    enali 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<