成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

nodejs微服務解決方案

canger / 2446人閱讀

摘要:前言是一個微服務工具集,它賦予系統(tǒng)易于連續(xù)構(gòu)建和更新的能力。這個對象既包含某個微服務所需要調(diào)取另一個微服務的特征,同時也包含傳參。和微服務發(fā)現(xiàn)有些類似不過是用模式代替,目前為止模式是完全可以實現(xiàn)服務發(fā)現(xiàn)功能,但是否更加靈活還有待去挖掘。

前言

seneca是一個nodejs微服務工具集,它賦予系統(tǒng)易于連續(xù)構(gòu)建和更新的能力。下面會逐一和大家一起了解相關(guān)技術(shù)入門以及實踐。

這里插入一段硬廣。小子再進行簡單整合之后擼了個vastify框架 ---- 輕量級nodejs微服務框架,有興趣的同學過目一下,歡迎順手star一波,另外有疑問或者代碼有毛病歡迎在博文下方留言。

環(huán)境

基礎(chǔ)環(huán)境

"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"

微服務工程

"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"
FEATURES

模式匹配做服務間調(diào)用:略微不同于SpringCloud服務發(fā)現(xiàn)(http協(xié)議、IP + PORT模式),它使用更加靈活的模式匹配(Patrun模塊)原則去進行微服務間的調(diào)用

接入koa2對C端提供RESTFUl API

插件:更靈活編寫小而微的可復用模塊

seneca內(nèi)置日志輸出

第三方日志庫比較winston(選用)、bunyan、log4js

RabbitMQ消息隊列

PM2:node服務部署(服務集群)、管理與監(jiān)控

PM2:自動化部署

PM2集成docker

請求追蹤(重建用戶請求流程)

梳理Consul 服務注冊與發(fā)現(xiàn)基本邏輯

框架集成node-consul

mongodb持久化存儲

結(jié)合seneca與consul的路由服務中間件(可支持多個相同名字服務集群路由,通過$$version區(qū)別)

支持流處理(文件上傳/下載等)

jenkins自動化部署

nginx負載均衡

持續(xù)集成方案

redis緩存

prisma提供GraphQL接口

模式匹配(Patrun模塊)

index.js(accout-server/src/index.js)

const seneca = require("seneca")()

seneca.use("cmd:login", (msg, done) => {
    const { username, pass } = msg
    if (username === "asd" && pass === "123") {
        return done(null, { code: 1000 })
    }
    return done(null, { code: 2100 })
})

const Promise = require("bluebird")

const act = Promise.promisify(seneca.act, { context: "seneca" })

act({
    cmd: "login",
    username: "asd",
    pass: "123"
}).then(res => {
    console.log(res)
}).catch(err => {
    console.log(err)
})

執(zhí)行后

{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}

seneca.add方法,添加一個action pattern到Seneca實例中,它有三個參數(shù):

pattern: 用于Seneca中JSON的消息匹配模式,對象或格式化字符串

sub_pattern: 子模式,優(yōu)先級低于主模式(可選)

action: 當匹配成功后的動作函數(shù)

seneca.act方法,執(zhí)行Seneca實例中匹配成功的動作,它也有兩個參數(shù):

msg: JSON消息

sub_pattern: 子消息,優(yōu)先級低于主消息(可選)

response: 用于接收服務調(diào)用結(jié)果

seneca.use方法,為Seneca實例添加一個插件,它有兩個參數(shù):(此處插件的原理和中間件有一些不同)

func: 插件執(zhí)行方法

options: 插件所需options(可選)

核心是利用JSON對象進行模式匹配。這個JSON對象既包含某個微服務所需要調(diào)取另一個微服務的特征,同時也包含傳參。和Java微服務發(fā)現(xiàn)有些類似不過是用模式代替ip+port,目前為止模式是完全可以實現(xiàn)服務發(fā)現(xiàn)功能,但是否更加靈活還有待去挖掘。

所需注意的點

各微服務之間模式需通過設(shè)計來區(qū)分

啟動第一個微服務

index.js(config-server/src/index.js)

const seneca = require("seneca")()
const config = {
SUCCESS_NORMAL_RES: {
    code: 1000,
    desc: "服務端正常響應"
}}

seneca.add("$target$:config-server", (msg, done) => {
  return done(null, config)
}).listen(10011)

運行此腳本后可在瀏覽器中輸入http://localhost:10011/act?cmd=config發(fā)起請求獲取全局配置信息

OR

const seneca = require("seneca")()
const Promise = require("bluebird")

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client(10011)
act("$$target:config-server, default$:{msg:404}").then(res => {
  console.log(res)
}).catch(err => {
  console.log(err)
})
對內(nèi):多個微服務相互調(diào)用(關(guān)鍵)

noname-server

const seneca = require("seneca")()
seneca.add("$$target:account-server", (msg, done) => {
    done(null, { seneca: "666" })
})
seneca.listen(10015)

config-server(同上)

call

const seneca = require("seneca")()
const Promise = require("blurebird")

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client({
    port: "10011",
    pin: "$$target:account-server"
})
seneca.client({
    port: "10015",
    pin: "$$target:noname-server"
})

act("$$target:account-server").then(res => {
    console.log(res)
}).catch(err => {
    console.log(err)
})

act("$$target:noname-server").then(res => {
    console.log(res)
}).catch(err => {
    console.log(err)
})
對外:提供REST服務(關(guān)鍵)

集成koa

const seneca = require("seneca")()
const Promise = require("bluebird")
const SenecaWeb = require("seneca-web")
const Koa = require("koa")
const Router = require("koa-router")
const app = new Koa()
const userModule = require("./modules/user.js")

// 初始化用戶模塊
seneca.use(userModule.init)

// 初始化seneca-web插件,并適配koa
seneca.use(SenecaWeb, {
  context: Router(),
  adapter: require("seneca-web-adapter-koa2"),
  routes: [...userModule.routes]
})

// 將routes導出給koa app
seneca.ready(() => {
  app.use(seneca.export("web/context")().routes())
})

app.listen(3333)

user模塊

const $module = "module:user"
let userCount = 3

const REST_Routes = [
  {
    prefix: "/user",
    pin: `${$module},if:*`,
    map: {
      list: {
        GET: true,
        name: ""
      },
      load: {
        GET: true,
        name: "",
        suffix: "/:id"
      },
      edit: {
        PUT: true,
        name: "",
        suffix: "/:id"
      },
      create: {
        POST: true,
        name: ""
      },
      delete: {
        DELETE: true,
        name: "",
        suffix: "/:id"
      }
    }
  }
]

const db = {
  users: [{
    id: 1,
    name: "甲"
  }, {
    id: 2,
    name: "乙"
  }, {
    id: 3,
    name: "丙"
  }]
}

function user(options) {
  this.add(`${$module},if:list`, (msg, done) => {
    done(null, db.users)
  })
  this.add(`${$module},if:load`, (msg, done) => {
    const { id } = msg.args.params
    done(null, db.users.find(v => Number(id) === v.id))
  })
  this.add(`${$module},if:edit`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const { name } = msg.args.body
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1, {
        id,
        name
      })
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
  this.add(`${$module},if:create`, (msg, done) => {
    const { name } = msg.args.body
    db.users.push({
      id: ++userCount,
      name
    })
    done(null, db.users)
  })
  this.add(`${$module},if:delete`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1)
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
}

module.exports = {
  init: user,
  routes: REST_Routes
}

vscode-restclient(vscode的restclient插件,用于發(fā)起RESTFUL請求)

### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json

{
  "name": "測試添加用戶"
}

### delete
DELETE http://localhost:3333/user/2 HTTP/1.1

### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json

{
  "name": "測試修改用戶信息"
}

### GET
GET http://localhost:3333/user HTTP/1.1

### GET
GET http://localhost:3333/user/3 HTTP/1.1
seneca內(nèi)置日志輸出

可在構(gòu)造函數(shù)中傳入配置,log屬性可以控制日志級別

例1:傳字符串

require("seneca")({
    // quiet silent any all print standard test
    log: "all"
})

例2:傳對象

require("seneca")({
    log: {
        // none debug+ info+ warn+
        level: "debug+"
    },
    // 設(shè)置為true時,seneca日志功能會encapsulate senecaId,senecaTag,actId等字段后輸出(一般為兩字符)
    short: true
})

建議例2代碼,因為seneca-web-adapter-koa2插件打印的日志level為debug,利于做web接口訪問日志記錄。

winston日志模塊

傳送門

Logger.js

const { createLogger, format, transports } = require("winston")
const { combine, timestamp, label, printf } = format

const logger = createLogger({
  level: "info",
  format: combine(
    label({label: "microservices"}),
    timestamp(),
    printf(info => {
      return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
    })
  ),
  transports: [ new transports.Console() ]
})

// highest to lowest
const levels = {
  error: 0,
  warn: 1,
  info: 2,
  verbose: 3,
  debug: 4,
  silly: 5
}

module.exports = logger

日志輸出格式

2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客戶端的調(diào)用請求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message
RabbitMQ消息隊列服務

安裝

1. 單任務單consumer,生產(chǎn)者消費者模式

producer.js

// 創(chuàng)建一個amqp對等體
const amqp = require("amqplib/callback_api")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = "taskQueue1"
    const msg = process.argv.slice(2).join(" ") || "hello world"

    // 為方式RabbitMQ退出或者崩潰時重啟后丟失隊列信息,這里配置durable:true(同時在消費者腳本中也要配置durable:true)后,
    ch.assertQueue(q, { durable: true })
    // 這里配置persistent:true,通過閱讀官方文檔,我理解為當程序重啟后,會斷點續(xù)傳之前未send完成的數(shù)據(jù)消息。(但此功能并不可靠,因為不會為所有消息執(zhí)行同步IO,會緩存在cache并在某個恰當時機write到disk)
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    setTimeout(() => {
      conn.close(); process.exit(0)
    }, 100)
  })
})
// 創(chuàng)建一個amqp對等體
const amqp = require("amqplib/callback_api")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = "taskQueue1"

    // 為方式RabbitMQ退出或者崩潰時重啟后丟失隊列信息,這里配置durable:true(同時在消費者腳本中也要定義durable:true)后,
    ch.assertQueue(q, { durable: true })
    ch.prefetch(1)
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
    ch.consume(q, msg => {
      const secs = msg.content.toString().split(".").length - 1
      console.log(" [x] Received %s", msg.content.toString())
      setTimeout(() => {
        console.log(" [x] Done")
        ch.ack(msg)
      }, secs * 1000)
    })
    // noAck配置(默認為false)表明consumer是否需要在處理完后反饋ack給producer,如果設(shè)置為true,則RabbitMQ服務如果將任務send至此consumer后不關(guān)心任務實際處理結(jié)果,send任務后直接標記已完成;否則,RabbiMQ得到ack反饋后才標記為已完成,如果一直未收到ack默認會一直等待ack然后標記,另外如果接收到nack或者該consumer進程退出則繼續(xù)dispatcher任務
  })
})

檢驗過程

執(zhí)行rabbitmqctl list_queues查看當前隊列

Timeout: 60.0 seconds ...
Listing queues for vhost / ...

node producer.js(rabbitMQ執(zhí)行過程為會先創(chuàng)建一個匿名exchange,一個指定queue然后將queue與該匿名exchange綁定)

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        taskQueue1      queue   taskQueue1      []

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      1

node consumer.js

Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      0

知識點

生產(chǎn)者消費者模式(一個生產(chǎn)者的消息在同一時間只交由一個消費者處理)

ACK機制(rabbitmq的確認機制)

創(chuàng)建隊列{durable:true}以及向隊列發(fā)送消息{persistent:true}(消息持久化存儲,但不完全能保證,比如當某消息未從緩存中寫到磁盤中而程序崩潰時則會丟失)

Round-robin Dispatch(公平分發(fā))

處理窗口控制(prefetch來控制分發(fā)窗口)

異步多任務處理機制(比如一個大任務分解,分而治之)

整個消息流流程(某個生產(chǎn)者進程 -> 匿名exchange -> 通過binding -> 指定queue -> 某一個消費者進程)

2. 單任務多consumer,發(fā)布/訂閱模式(全消息模型)

publisher.js

const amqp = require("amqplib/callback_api")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = "logs"
    const msg = process.argv.slice(2).join(" ") || "Hello World!"

    // ex為exchange名稱(唯一)
    // 模式為fanout
    // 不對消息持久化存儲
    ch.assertExchange(ex, "fanout", { durable: false })
    // 第二個參數(shù)為指定某一個binding,如為空則由RabbitMQ隨機指定
    ch.publish(ex, "", Buffer.from(msg))
    console.log(" [x] Send %s", msg)
  })

  setTimeout(() => {
    conn.close()
    process.exit(0)
  }, 100)
})

subscriber.js

const amqp = require("amqplib/callback_api")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = "logs"

    // ex -> exchange是發(fā)布/訂閱消息的載體,
    // fanout -> 分發(fā)消息的模式,fanout,direct,topic,headers
    // durable設(shè)置為false降低一些可靠性,提高性能,因為不需要磁盤IO持久化存儲消息,另外
    ch.assertExchange(ex, "fanout", { durable: false })
    // 使用匿名(也就是RabbitMQ自動生成隨機名的queue)隊列
    // exclusive設(shè)置為true,即可以當其寄生的connection被close的時候自動deleted
    ch.assertQueue("", { exclusive: true }, (err, q) => {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
      // 綁定隊列到某個exchange載體(監(jiān)聽某個exchange的消息)
      // 第三個入?yún)閎inding key
      ch.bindQueue(q.queue, ex, "")
      // 消費即訂閱某個exchange的消息并設(shè)置處理句柄
      // 因為發(fā)布/訂閱消息的模式就是非可靠性,只有當訂閱者訂閱才能收到相關(guān)的消息而且發(fā)布者不關(guān)心該消息的訂閱者是誰以及處理結(jié)果如何,所以這里noAck會置為true
      ch.consume(q.queue, (msg) => {
        console.log(" [x] %s", msg.content.toString())
      }, { noAck: true })
    })
  })
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []

node publisher.js tasks.........

[x] Send tasks......... // publiser.js

[x] tasks......... // subscriber.js

知識點

發(fā)布/訂閱模式(發(fā)布者將消息以一對多的形式發(fā)送給訂閱者處理)

noAck(此模式下推薦用非Ack機制,因為發(fā)布者往往不需要訂閱者如何處理消息以及其結(jié)果)

durable:false(此模式下推薦不需要做數(shù)據(jù)持久化存儲,原因如上)

exchange的工作模式(即路由類型,fanout,direct,topic,headers等,下節(jié)會講解到)

整個消息流流程(某個發(fā)布者進程 -> 指定exchange -> 通過binding以及工作模式 -> 某個或多個匿名queue即訂閱者進程)

3. Direct Routing

exchange.js

module.exports = {
  name: "ex1",
  type: "direct",
  option: {
    durable: false
  },
  ranks: ["info", "error", "warning", "severity"]
}

direct-routing.js

const amqp = require("amqplib/callback_api")
const ex = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {

    ch.assertExchange(ex.name, ex.type, ex.options)
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require("amqplib/callback_api")
const ex = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      // 聲明一個非匿名queue
      ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
        ch.bindQueue(q.queue, ex.name, rank)
        ch.consume(q.queue, msg => {

          console.log(" [x] %s: "%s"", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require("amqplib/callback_api")
const ex = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
    })

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node direct-routing.js
rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers    headers
ex1    direct
amq.fanout    fanout
amq.rabbitmq.trace    topic
amq.topic    topic
    direct
amq.direct    direct
amq.match    headers

node subscriber.js
rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue    0
error-queue    0
info-queue    0
warning-queue    0

Listing bindings for vhost /...
    exchange    error-queue    queue    error-queue    []
    exchange    info-queue    queue    info-queue    []
    exchange    severity-queue    queue    severity-queue    []
    exchange    warning-queue    queue    warning-queue    []
ex1    exchange    error-queue    queue    error    []
ex1    exchange    info-queue    queue    info    []
ex1    exchange    severity-queue    queue    severity    []
ex1    exchange    warning-queue    queue    warning    []

node publisher.js

 [x] info: "info logs..."
 [x] error: "error logs..."
 [x] severity: "severity logs..."
 [x] warning: "warning logs..."

知識點

路由key,用于exchange的direct工作模式下消息的路由

每當assertQueue時,該queue會在以queue名稱當作路由key綁定到匿名exchange

可用于日志不同級別的log處理

4. Topic Routing

exchange.js

module.exports = {
  name: "ex2",
  type: "topic",
  option: {
    durable: false
  },
  ranks: ["info", "error", "warning", "severity"]
}

topic-routing.js

const amqp = require("amqplib/callback_api")
const exchangeConfig = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

subscriber.js

const amqp = require("amqplib/callback_api")
const exchangeConfig = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const keys = (args.length > 0) ? args : ["anonymous.info"]

    console.log(" [*] Waiting for logs. To exit press CTRL+C");
    keys.forEach(key => {
      ch.assertQueue("", { exclusive: true }, (err, q) => {
        console.log(` [x] Listen by routingKey ${key}`)
        ch.bindQueue(q.queue, exchangeConfig.name, key)

        ch.consume(q.queue, msg => {
          console.log(" [x] %s:"%s"", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})

publisher.js

const amqp = require("amqplib/callback_api")
const exchangeConfig = require("./exchange")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const key = (args.length > 1) ? args[0] : "anonymous.info"
    const msg = args.slice(1).join(" ") || "hello world"

    ch.publish(exchangeConfig.name, key, Buffer.from(msg))

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空之前測試使用的queues、echanges、bindings)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout    fanout
amq.rabbitmq.trace    topic
amq.headers    headers
amq.match    headers
ex2    topic
    direct
amq.topic    topic
amq.direct    direct

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error

node publisher.js "account-server.info" "用戶服務測試"

node publisher.js "config-server.info" "配置服務測試"

node publisher.js "config-server.error" "配置服務出錯"

[x] account-server.info:"用戶服務測試"
[x] config-server.info:"配置服務測試"
[x] config-server.error:"配置服務出錯"

知識點

key最長為255字節(jié)

#可匹配0或多個單詞,*可精確匹配1個單詞

5. RPC

rpc_server.js

const amqp = require("amqplib/callback_api")
const logger = require("./Logger")

let connection = null

amqp.connect("amqp://localhost", (err, conn) => {
  connection = conn
  conn.createChannel((err, ch) => {
    const q = "account_rpc_queue"

    ch.assertQueue(q, { durable: true })
    ch.prefetch(2)

    ch.consume(q, msg => {
      let data = {}
      let primitiveContent = msg.content.toString()
      try {
        data = JSON.parse(primitiveContent)
      } catch (e) {
        logger.error(new Error(e))
      }
      logger.info("接收到rpc客戶端的調(diào)用請求")
      if (msg.properties.correlationId === "10abc") {
        logger.info(primitiveContent)
        const uid = Number(data.uid) || -1
        let r = getUserById(uid)
        ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
        ch.ack(msg)
      } else {
        logger.info("不匹配的調(diào)用請求")
      }
    })
  })
})

function getUserById (uid) {
  let result = ""

  if (uid === +uid && uid > 0) {
    result = {
      state: 1000,
      msg: "成功",
      data: {
        uid: uid,
        name: "小強",
        sex: 1
      }
    }
  } else {
    result = {
      state: 2000,
      msg: "傳參格式錯誤"
    }
  }

  return result
}

process.on("SIGINT", () => {
  logger.warn("SIGINT")
  connection && connection.close()
  process.exit(0)
})

rpc_client.js

const amqp = require("amqplib/callback_api")

amqp.connect("amqp://localhost", (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = "account_rpc_queue"
    const callback = "callback_queue"

    ch.assertQueue(callback, { durable: true })
    ch.consume(callback, msg => {
      const result = msg.content.toString()
      console.log(`接收到回調(diào)的消息啦!`)
      console.log(result)
      ch.ack(msg)
      setTimeout(() => {
        conn.close()
        process.exit(0)
      }, 0)
    })

    ch.assertQueue(q, { durable: true })
    const msg = {
      uid: 2
    }
    ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
      persistent: true,
      correlationId: "10abc",
      replyTo: "callback_queue"
    })
  })
})

檢驗過程

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue    0

node rpc_client.js

rpc_client的CLI打印

接收到回調(diào)的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小強","sex":1}}

rpc_server的CLI打印

接收到rpc客戶端的調(diào)用請求
{ uid: 2 }
PM2:node服務部署(服務集群)、管理與監(jiān)控

pm2官網(wǎng)

啟動

pm2 start app.js

-w --watch:監(jiān)聽目錄變化,如變化則自動重啟應用

--ignore-file:監(jiān)聽目錄變化時忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"

-n --name:設(shè)置應用名字,可用于區(qū)分應用

-i --instances:設(shè)置應用實例個數(shù),0與max相同

-f --force: 強制啟動某應用,常常用于有相同應用在運行的情況

-o --output :標準輸出日志文件的路徑

-e --error :錯誤輸出日志文件的路徑

--env :配置環(huán)境變量

pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log

在cluster-mode,也就是-i max下,日志文件會自動在后面追加-${index}保證不重復
其他簡單且常用命令

pm2 stop app_name|app_id
pm2 restart app_name|app_id
pm2 delete app_name|app_id
pm2 show app_name|app_id OR pm2 describe app_name|app_id
pm2 list
pm2 monit
pm2 logs app_name|app_id --lines --err

Graceful Stop

pm2 stop app_name|app_id

process.on("SIGINT", () => {
  logger.warn("SIGINT")
  connection && connection.close()
  process.exit(0)
})

當進程結(jié)束前,程序會攔截SIGINT信號從而在進程即將被殺掉前去斷開數(shù)據(jù)庫連接等等占用內(nèi)存的操作后再執(zhí)行process.exit()從而優(yōu)雅的退出進程。(如在1.6s后進程還未結(jié)束則繼續(xù)發(fā)送SIGKILL信號強制進程結(jié)束)

Process File

ecosystem.config.js

const appCfg = {
  args: "",
  max_memory_restart: "150M",
  env: {
    NODE_ENV: "development"
  },
  env_production: {
    NODE_ENV: "production"
  },
  // source map
  source_map_support: true,
  // 不合并日志輸出,用于集群服務
  merge_logs: false,
  // 常用于啟動應用時異常,超時時間限制
  listen_timeout: 5000,
  // 進程SIGINT命令時間限制,即進程必須在監(jiān)聽到SIGINT信號后必須在以下設(shè)置時間結(jié)束進程
  kill_timeout: 2000,
  // 當啟動異常后不嘗試重啟,運維人員嘗試找原因后重試
  autorestart: false,
  // 不允許以相同腳本啟動進程
  force: false,
  // 在Keymetrics dashboard中執(zhí)行pull/upgrade操作后執(zhí)行的命令隊列
  post_update: ["npm install"],
  // 監(jiān)聽文件變化
  watch: false,
  // 忽略監(jiān)聽文件變化
  ignore_watch: ["node_modules"]
}

function GeneratePM2AppConfig({ name = "", script = "", error_file = "", out_file = "", exec_mode = "fork", instances = 1, args = "" }) {
  if (name) {
    return Object.assign({
      name,
      script: script || `${name}.js`,
      error_file: error_file || `${name}-err.log`,
      out_file: out_file|| `${name}-out.log`,
      instances,
      exec_mode: instances > 1 ? "cluster" : "fork",
      args
    }, appCfg)
  } else {
    return null
  }
}

module.exports = {
  apps: [
    GeneratePM2AppConfig({
      name: "client",
      script: "./rpc_client.js"
    }),

    GeneratePM2AppConfig({
      name: "server",
      script: "./rpc_server.js",
      instances: 1
    })
  ]
}

pm2 start ecosystem.config.js

避坑指南:processFile文件命名建議為*.config.js格式。否則后果自負。
監(jiān)控

請移步app.keymetrics.io

PM2:自動化部署 ssh準備

ssh-keygen -t rsa -C "qingf deployment" -b 4096

如果有多密鑰、多用戶情況,建議配置~/.ssh/config文件,格式類似如下

// 用不同用戶對不同遠程主機發(fā)起ssh請求時指定私鑰
Host qingf.me
  User deploy
  IdentityFile ~/.ssh/qf_deployment_rsa
  // 設(shè)置為no可去掉首次登陸(y/n)的選擇
  StrictHostKeyChecking no
// 別名用法
Host deployment
  User deploy
  Hostname qingf.me
  IdentityFile ~/.ssh/qingf_deployment_rsa
  StrictHostKeyChecking no

將公鑰復制到遠程(一般為部署服務器)對應用戶目錄,比如/home/deploy/.ssh/authorized_keys文件(authorized_keys文件權(quán)限設(shè)置為600)

配置ecosystem.config.js

與上述apps同級增加deploy屬性,如下

deploy: {
    production: {
        "user": "deploy",
        "host": "qingf.me",
        "ref": "remotes/origin/master",
        "repo": "https://github.com/Cecil0o0/account-server.git",
        "path": "/home/deploy/apps/account-server",
        // 生命周期鉤子,在ssh到遠端之后setup操作之前執(zhí)行
        "pre-setup": "",
        // 生命周期鉤子,在初始化設(shè)置即git pull之后執(zhí)行
        "post-setup": "ls -la",
        // 生命周期鉤子,在遠端git fetch origin之前執(zhí)行
        "pre-setup": "",
        // 生命周期鉤子,在遠端git修改HEAD指針到指定ref之后執(zhí)行
        "post-deploy": "npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production",
        // 以下這個環(huán)境變量將注入到所有app中
        "env"  : {
          "NODE_ENV": "test"
        }
    }
}
tip:please make git working directory clean first!

此處如果不懂或者有疑問,請查閱Demo

然后先后執(zhí)行以下兩條命令(注意config文件路徑)

pm2 deploy deploy/ecosystem.config.js production setup

pm2 deploy deploy/ecosystem.config.js production

其他命令

pm2 deploy

  Commands:
    setup                run remote setup commands
    update               update deploy to the latest release
    revert [n]           revert to [n]th last deployment or 1
    curr[ent]            output current release commit
    prev[ious]           output previous release commit
    exec|run        execute the given 
    list                 list previous deploy commits
    [ref]                deploy to [ref], the "ref" setting, or latest tag
推薦shell toolkit

oh my zsh

請求追蹤 如何?

在seneca.add以及seneca.act中使用seneca.fixedargs["tx$"]值作為traceID標識處于某一條請求流程。另外seneca內(nèi)置log系統(tǒng)會打印此值。

疑問?

seneca內(nèi)置log系統(tǒng)如何做自定義日志打?。?/p>

溫馨提示:請以正常的http請求開始,因為經(jīng)過測試如果微服務自主發(fā)起act,其seneca.fixedargs["tx$"]值不同。
Consul 服務注冊與發(fā)現(xiàn)

Consul是一個分布式集群服務注冊發(fā)現(xiàn)工具,并具有健康檢查、分級式KV存儲、多數(shù)據(jù)中心等高級特性。

安裝

可選擇使用預編譯的安裝包

也可選擇克隆源碼后編譯安裝

基礎(chǔ)使用

以開發(fā)模式快速啟動服務模式代理并開啟web界面訪問http://localhost:8500

consul agent -dev -ui

編寫服務定義文件

{
  "service": {
    // 服務名,稍后用于query服務
    "name": "account-server",
    // 服務標簽
    "tags": ["account-server"],
    // 服務元信息
    "meta": {
      "meta": "for my service"
    },
    // 服務端口
    "port": 3333,
    // 不允許標簽覆蓋
    "enable_tag_override": false,
    // 腳本檢測做health checks 與-enable-script-checks=true配合使用,有腳本模式、TCP模式、HTTP模式、TTL模式
    "checks": [
      {
        "http": "http://localhost:3333/user",
        "interval": "10s"
      }
    ]
  }
}

query定義的account-server服務

curl http://localhost:8500/v1/catalog/service/account-server

[
    {
        "ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
        "Node": "haojiechen.local",
        "Address": "127.0.0.1",
        "Datacenter": "dc1",
        "TaggedAddresses": {
            "lan": "127.0.0.1",
            "wan": "127.0.0.1"
        },
        "NodeMeta": {
            "consul-network-segment": ""
        },
        "ServiceID": "account-server",
        "ServiceName": "account-server",
        "ServiceTags": [
            "account-server"
        ],
        "ServiceAddress": "",
        "ServiceMeta": {
            "meta": "for my service"
        },
        "ServicePort": 3333,
        "ServiceEnableTagOverride": false,
        "CreateIndex": 6,
        "ModifyIndex": 6
    }
]
生產(chǎn)級別使用(分布式集群)

某一個結(jié)點啟動一個server模式代理,如下

consul agent -server -bootstrap-expect=1 
    -data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP 
    -enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  

另一個結(jié)點啟動一個client模式代理,如下

consul agent 
    -data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 
    -enable-script-checks=true -config-dir=/usr/local/etc/consul.d

查看集群成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  

加入Cluster

consul join 139.129.5.228
consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  
集成node-consul

config.js

// 服務注冊與發(fā)現(xiàn)
// https://github.com/silas/node-consul#catalog-node-services
  "serverR&D": {
    consulServer: {
      type: "consul",
      host: "127.0.0.1",
      port: 8500,
      secure: false,
      ca: [],
      defaults: {
        token: ""
      },
      promisify: true
    },
    bizService: {
      name: "defaultName",
      id: "defaultId",
      address: "127.0.0.1",
      port: 1000,
      tags: [],
      meta: {
        version: "",
        description: "注冊集群"
      },
      check: {
        http: "",
        // check間隔時間(ex: 15s)
        interval: "10s",
        // check超時時間(ex: 10s)
        timeout: "2s",
        // 處于臨界狀態(tài)后自動注銷服務的超時時間
        deregistercriticalserviceafter: "30s",
        // 初始化狀態(tài)值為成功
        status: "passing",
        // 備注
        notes: "{"version":"111","microservice-port":1115}"
      }
    }
  }

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 11:26:49
 * @Description 微服務注冊方法
 */
const defaultConf = require("../config")["serverR&D"]
const { ObjectDeepSet, isString } = require("../helper/utils")
const Consul = require("consul")
const { generateServiceName, generateCheckHttp } = require("../helper/consul")

// 注冊服務

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error("name is invalid!")
  if (bizService.port !== +bizService.port) throw new Error("port is invalid!")
  if (!bizService.host && isString(bizService.host)) throw new Error("host is invalid!")
  if (!bizService.meta.$$version) throw new Error("meta.$$version is invalid!")
  if (!bizService.meta.$$microservicePort) throw new Error("meta.$$microservicePort is invalid!")
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 檢查主機+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`該服務集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注冊集群服務
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服務已注冊`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}
驗證

保證runtime中存在consul和mongodb服務后,clone該倉庫Demo,cd到工程根目錄下,運行node src即可。

框架集成node-consul

server-register.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 13:58:22
 * @Description 微服務注冊方法
 */
const defaultConf = require("../config")["serverR&D"]
const { ObjectDeepSet, isString } = require("../helper/utils")
const Consul = require("consul")
const { generateServiceName, generateCheckHttp } = require("../helper/consul")
const logger = new (require("./logger"))().generateLogger()

// 注冊服務方法定義

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error("name is invalid!")
  if (bizService.port !== +bizService.port) throw new Error("port is invalid!")
  if (!bizService.host && isString(bizService.host)) throw new Error("host is invalid!")
  if (!bizService.meta.$$version) throw new Error("meta.$$version is invalid!")
  if (!bizService.meta.$$microservicePort) throw new Error("meta.$$microservicePort is invalid!")
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 檢查主機+端口是否已被占用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`該服務集群endpoint[${service.address}, ${service.port}]已被占用!`)
        }
      })
      // 注冊集群服務
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服務注冊成功`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}

account-server/src/index.js

const vastify = require("vastify")
const version = require("../package.json").version
const microservicePort = 10015
const httpPort = 3333

// 注冊服務
vastify.ServerRegister.register({
  bizService: {
    name: "account-server",
    host: "127.0.0.1",
    port: httpPort,
    meta: {
      $$version: version,
      $$microservicePort: microservicePort
    }
  }
})
Mongodb持久化存儲

框架使用mongoose做mongoClient,當然你也可以選用原生nodejs mongoClient。

改造之前的user模塊,偷個懶就不貼代碼了,具體請查看Demo

結(jié)合seneca以及consul的路由服務中間件

microRouting.js

/*
 * @Author: Cecil
 * @Last Modified by: Cecil
 * @Last Modified time: 2018-06-02 16:22:02
 * @Description 微服務內(nèi)部路由中間件,暫不支持自定義路由匹配策略
 */

"use strict"

const Consul = require("consul")
const defaultConf = require("../config")
const { ObjectDeepSet, isNumber } = require("../helper/utils")
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require("../helper/consul")
const logger = new (require("../tools/logger"))().generateLogger()
const { IPV4_REGEX } = require("../helper/regex")

let services = {}
let consul = null

/**
 * @author Cecil0o0
 * @description 同步consul服務中心的所有可用服務以及對應check并組裝成對象以方便取值
 */
function syncCheckList () {
  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(allServices => {
      if (Object.keys(allServices).length > 0) {
        services = allServices
        consul.agent.check.list().then(checks => {
          Object.keys(checks).forEach(key => {
            allServices[getServiceIdByServiceKey(key)]["check"] = checks[key]
          })
          resolve(services)
        }).catch(err => {
          throw new Error(err)
        })
      } else {
        const errmsg = "未發(fā)現(xiàn)可用服務"
        logger.warn(errmsg)
        reject(errmsg)
      }
    }).catch(err => {
      throw new Error(err)
    })
  })
}

function syncRoutingRule(senecaInstance = {}, services = {}) {
  Object.keys(services).forEach(key => {
    let service = services[key]
    let name = getServiceNameByServiceKey(key)
    let $$addr = service.Address
    let $$microservicePort = ""
    let $$version = ""
    try {
      let base = JSON.parse(service.check.Notes)
      $$microservicePort = base.$$microservicePort
      $$version = base.$$version
    } catch (e) {
      logger.warn(`服務名為${serviceName}。該服務check.Notes為非標準JSON格式,程序已忽略。請檢查服務注冊方式(請確保調(diào)用ServerRegister的register來注冊服務)`)
    }

    if (IPV4_REGEX.test($$addr) && isNumber($$microservicePort)) {
      if (service.check.Status === "passing") {
        senecaInstance.client({
          host: $$addr,
          port: $$microservicePort,
          pin: {
            $$version,
            $$target: name
          }
        })
      } else {
        logger.warn(`${$$target}@${$$version || "無"}服務處于critical,因此無法使用`)
      }
    } else {
      logger.warn(`主機(${$$addr})或微服務端口號(${$$microservicePort})有誤,請檢查`)
    }
  })
}


function startTimeInterval() {
  setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}

function microRouting(consulServer) {
  var self = this
  consul = Consul(ObjectDeepSet(defaultConf["serverR&D"].consulServer, consulServer))
  syncCheckList().then(services => {
    syncRoutingRule(self, services)
  })
}

module.exports = microRouting

在保證有consul與mongodb的runtime后,請結(jié)合這兩個config-server,account-server Demo進行測試。

[未完待續(xù)....]

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/19269.html

相關(guān)文章

  • SpringCloud(第 027 篇)集成異構(gòu)服務系統(tǒng)到 SpringCloud 生態(tài)圈中(比如

    摘要:注意注解能注冊到服務上,是因為該注解包含了客戶端的注解,該是一個復合注解。包含了客戶端注解,同時也包含了斷路器模塊注解,還包含了網(wǎng)關(guān)模塊。 SpringCloud(第 027 篇)集成異構(gòu)微服務系統(tǒng)到 SpringCloud 生態(tài)圈中(比如集成 nodejs 微服務) - 一、大致介紹 1、在一些稍微復雜點系統(tǒng)中,往往都不是單一代碼寫的服務,而恰恰相反集成了各種語言寫的系統(tǒng),并且我們還...

    caozhijian 評論0 收藏0
  • SpringCloud(第 026 篇)簡單異構(gòu)系統(tǒng)之 nodejs 服務

    摘要:第篇簡單異構(gòu)系統(tǒng)之微服務一大致介紹因為在后面要利用集成異構(gòu)系統(tǒng),所以才有了本章節(jié)的微服務本章節(jié)使用了最簡單的請求截取的方式,截取不同的后綴做不同的響應處理,簡直二實現(xiàn)步驟添加服務端文件引入模塊創(chuàng)建獲得請求的路徑訪問,將會返回歡迎 SpringCloud(第 026 篇)簡單異構(gòu)系統(tǒng)之 nodejs 微服務 - 一、大致介紹 1、因為在后面要利用 SpringCloud 集成異構(gòu)系統(tǒng),所...

    raledong 評論0 收藏0
  • NodeJs開發(fā)信公眾號(一)

    摘要:古話說萬事開頭難回頭細想想還真是這樣在沒有開始做微信公眾號開發(fā)之前我以為它很復雜但是學過之后只想說原來這里是我的項目的地址下面我就把我的學習過程做一下總結(jié)希望可以幫助到有需要的人兒粗鄙之見恐有不足歡迎指教在閱讀下文之前你應該對和框架有一定的 古話說: 萬事開頭難, 回頭細想想還真是這樣,在沒有開始做微信公眾號開發(fā)之前我以為它很復雜,但是學過之后只想說原來just so so~ 這里是我...

    tigerZH 評論0 收藏0
  • nodejs實現(xiàn)信授權(quán)

    摘要:背景項目基于開發(fā)服務器用,提供微信授權(quán)和跨域使用微信獲取經(jīng)緯度,再利用百度地圖獲取用戶所在城市待優(yōu)化問題項目的微信授權(quán)基于老的項目,寫的有點不夠優(yōu)雅自己開發(fā)時把授權(quán)信息存到了里面,導致每次進入項目都要走一遍授權(quán),浪費時間和影響用戶體驗解決方 背景項目基于angular1.3開發(fā)web服務器用node,提供微信授權(quán)和跨域使用微信jssdk獲取經(jīng)緯度,再利用百度地圖API獲取用戶所在城市 ...

    longmon 評論0 收藏0
  • 全棧最后一公里 - Node.js 項目的線上服務器部署與發(fā)布

    摘要:沒有耐心閱讀的同學,可以直接前往學習全棧最后一公里。我下面會羅列一些,我自己錄制過的一些項目,或者其他的我覺得可以按照這個路線繼續(xù)深入學習的項目資源。 showImg(https://segmentfault.com/img/bVMlke?w=833&h=410); 本文技術(shù)軟文,閱讀需謹慎,長約 7000 字,通讀需 5 分鐘 大家好,我是 Scott,本文通過提供給大家學習的方法,...

    Nosee 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<