成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

KAFKA集群部署實踐

IT那活兒 / 1701人閱讀
KAFKA集群部署實踐

點擊上方“IT那活兒”,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。。?/span>



基本概念



消費者(Consumer):向topic注冊,并且接收發(fā)布到這些topic的消息

生產(chǎn)者(Producer):向kafka的topic發(fā)布消息的程序

服務(wù)端(broker):kafka以一個擁有一臺或多臺服務(wù)器的集群運行著,每一臺服務(wù)器稱為broker

1. Kafka架構(gòu)

生產(chǎn)者、kafka集群、消費者架構(gòu)圖

kafka集群中的消息,以topic 的形式組織排序,如下圖所示:

2. 基本概念

2.1 主題(Topic)

對消息進行分類單元,通常在一個應(yīng)用中對應(yīng)一個Topic,以此進行區(qū)分。一個主題就是一個類別或者一個可訂閱的條目名稱。對每個topic來說,kafka維護的是一個分區(qū)日志(partitioned log)

2.2 分區(qū)(Partition)

每個分區(qū)是一個有序的、不可變的消息序列,這個序列可以被連續(xù)地追加提交日志。在分區(qū)內(nèi)的每條消息都有一個有序的id號,這個id號被稱為偏移offset,這個偏移量可以唯一確定每條消息在分區(qū)內(nèi)的位置。

 工作圖



安裝步驟


1. zookeeper安裝

服務(wù)器三臺:安裝均在三臺服務(wù)器上統(tǒng)一操作。

192.168.48.130

192.168.48.131

192.168.48.132

注意:

zk3.5.5之后的版本,選擇帶bin 的為二進制安裝包;

注意jdk版本,3.5.5之后的一直用jdk1.8版本。

1.1 安裝java環(huán)境

yum -y install java-1.8.0-openjdk*

1.2 下載zookeeper并創(chuàng)建對應(yīng)目錄

cd /opt

mkdir zookeeper

mkdir -p zookeeper/zkdata #快照日志

mkdir -p zookeeper/ zkdatalog#事物日志


cd /opt/zookeeper/

wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz


1.3 修改配置文件

#進入conf目錄

cd /opt/zookeeper/ apache-zookeeper-3.5.9-bin/conf

cp zoo_sample.cfg zoo.cfg

#zoo_sample.cfg 官方的zookeeper樣例文件。


配置文件:

# The number of milliseconds of each tick

tickTime=2000

#
 The number of ticks that the initial

#
 synchronization phase can take

initLimit=10

#
 The number of ticks that can pass between

#
 sending a request and getting an acknowledgement

syncLimit=5

#
 the directory where the snapshot is stored.

#
 do not use /tmp for storage, /tmp here is just

#
 example sakes.

dataDir=/opt/zookeeper/zkdata

dataLogDir=/opt/zookeeper/zkdatalog


#
 the port at which the clients will connect

clientPort=12181

#
 the maximum number of client connections.

#
 increase this if you need to handle more clients

#
maxClientCnxns=60

#


#
 Be sure to read the maintenance section of the

#
 administrator guide before turning on autopurge.

#


#
 http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#


#
 The number of snapshots to retain in dataDir

#
autopurge.snapRetainCount=3

#
 Purge task interval in hours

#
 Set to "0" to disable auto purge feature

#
autopurge.purgeInterval=1


server.1=192.168.48.130:12888:13888

server.2=192.168.48.131:12888:13888

server.3=192.168.48.132:12888:13888


只根據(jù)情況修改了datadir 、clientPort和添加server配置。

#server.1 1為服務(wù)器標(biāo)識,主要用于區(qū)分標(biāo)識服務(wù)器,同樣下面myid文件里也是根據(jù)這個數(shù)字命名。

#clientPort:客戶端連接Zookeeper的端口。

1.4 創(chuàng)建myid文件

給zk集群服務(wù)器標(biāo)識,整個zk集群用來發(fā)現(xiàn)彼此的一個重要標(biāo)識。

#server1

echo "1" > /opt/zookeeper/zkdata/myid

#server2

echo "2" > /opt/zookeeper/zkdata/myid

#server3

echo "3" > /opt/zookeeper/zkdata/myid


1.5 啟動服務(wù)

cd /opt/zookeeper/apache-zookeeper-3.5.9-bin/bin

./zkServer.sh start

./zkServer.sh status  #檢查狀態(tài)

[root@localhost apache-zookeeper-3.5.9-bin]# ./bin/zkServer.sh status

/usr/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg

Client port found: 12181. Client address: localhost. Client SSL: false.

Mode: leader


#一般一個leader多個follower


2. kafka集群安裝測試

2.1軟件下載

#創(chuàng)建目錄:

cd /opt/

mkdir kafka #創(chuàng)建項目目錄

cd kafka

mkdir kafkalogs #kafka消息目錄,主要存放kafka消息


#下載軟件:

wget --no-check-certificate https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz


#解壓軟件:

tar -zxvf kafka_2.13-3.0.0.tgz


2.2 修改配置文件

cd /opt/kafka/kafka_2.13-3.0.0/config/


主要關(guān)注:server.properties 這個文件即可。

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.


# see kafka.server.KafkaConfig for additional details and defaults


############################# Server Basics #############################


# The id of the broker. This must be set to a unique integer for each broker.

broker.id=1


############################# Socket Server Settings #############################


# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.48.131:19092


# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured. Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092


# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details

#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# The number of threads that the server uses for receiving requests from the network and sending responses to the network

num.network.threads=3


# The number of threads that the server uses for processing requests, which may include disk I/O

num.io.threads=8


# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400


# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400


# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600


############################# Log Basics #############################


# A comma separated list of directories under which to store log files

log.dirs=/opt/kafka/kafkalogs


# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1


# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1


############################# Internal Topic Settings #############################

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"

# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1


############################# Log Flush Policy #############################


# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

# 1. Durability: Unflushed data may be lost if you are not using replication.

# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.


# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000


# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000


############################# Log Retention Policy #############################


# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.


# The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=48


# A size-based retention policy for logs. Segments are pruned from the log unless the remaining

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824


# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824


# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=300000


############################# Zookeeper #############################


# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=192.168.48.130:12181,192.168.48.131:12181,192.168.48.132:12181


# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################


# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.

# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.

# The default value for this is 3 seconds.

# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.

# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.

group.initial.rebalance.delay.ms=0


#broker.id=1  每臺服務(wù)器的broker.id都不能相同。

#zookeeper.connect=192.168.48.130:12181,192.168.48.131:12181,192.168.48.132:12181

#
listeners=PLAINTEXT://192.168.48.131:19092

#
log.dirs=/opt/kafka/kafkalogs


其他按需配置。

2.3 啟動集群

cd /opt/kafka/kafka_2.13-3.0.0

./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.13-3.0.0/config/server.properties


檢查是否啟動成功。

#jps

3284 Kafka

97419 Jps

5775 QuorumPeerMain

#
ps -ef|grep kafka


2.4 測試發(fā)布、消費消息

##創(chuàng)建topic:

./bin/kafka-topics.sh --create --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--replication-factor 3 --partitions 1 --topic test


## 刪除topic:

./bin/kafka-topics.sh --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--delete --topic test


## topic列表查詢:

./bin/kafka-topics.sh --list --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092


## 創(chuàng)建生產(chǎn)者:

./bin/kafka-console-producer.sh --broker-list 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test


## 創(chuàng)建消費者:

./bin/kafka-console-consumer.sh --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test  --from-beginning --consumer.config config/consumer.properties


##查看集群topic信息:

./bin/kafka-topics.sh --describe --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test


3. 測試消費情況

--生產(chǎn)者:

--消費者:




本文作者:吳昊

本文來源:IT那活兒(上海新炬王翦團隊)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/129619.html

相關(guān)文章

  • TiDB 在摩拜單車的深度實踐及應(yīng)用

    摘要:本文會選擇三個場景,給大家簡單介紹一下在摩拜單車的使用姿勢遇到的問題以及解決方案。圖在線業(yè)務(wù)集群拓?fù)鋱D四數(shù)據(jù)沙盒集群離線業(yè)務(wù)數(shù)據(jù)沙盒,屬于離線業(yè)務(wù)集群,是摩拜單車的一個數(shù)據(jù)聚合集群。 作者介紹:呂磊,摩拜單車高級 DBA。 一、業(yè)務(wù)場景 摩拜單車 2017 年開始將 TiDB 嘗試應(yīng)用到實際業(yè)務(wù)當(dāng)中,根據(jù)業(yè)務(wù)的不斷發(fā)展,TiDB 版本快速迭代,我們將 TiDB 在摩拜單車的使用場景逐漸...

    Paul_King 評論0 收藏0
  • 使用canal+Kafka進行數(shù)據(jù)庫同步實踐

    摘要:比如,服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于服務(wù)的數(shù)據(jù)庫服務(wù)的數(shù)據(jù)有變更操作時,需要同步到服務(wù)中。第二種解決方案通過數(shù)據(jù)庫的進行同步。并且,我們還用這套架構(gòu)進行緩存失效的同步。目前這套同步架構(gòu)正常運行中,后續(xù)有遇到問題再繼續(xù)更新。在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫,所以常常會遇到服務(wù)之間數(shù)據(jù)通信的問題。比如,B服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于A服務(wù)的數(shù)據(jù)庫;A服務(wù)的數(shù)據(jù)有變更操作時,需要同步到B服務(wù)中。第一...

    Tecode 評論0 收藏0

發(fā)表評論

0條評論

IT那活兒

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<