企業(yè)日志大數(shù)據(jù)分析系統(tǒng)ELK+KAFKA實現(xiàn)

| 2022-09-19 admin

背景:

最近線上上了ELK,但是只用了一臺Redis在中間作為消息隊列,以減輕前端es集群的壓力,Redis的集群解決方案暫時沒有接觸過,并且Redis作為消息隊列并不是它的強項;所以最近將Redis換成了專業(yè)的消息信息發(fā)布訂閱系統(tǒng) ,關(guān)于ELK的知識網(wǎng)上有很多的哦, 此篇博客主要是總結(jié)一下目前線上這個平臺的實施步驟,ELK是怎么跟Kafka結(jié)合起來的。好吧,動手!

ELK架構(gòu)拓?fù)洌?/h3>

然而我這里的整個日志收集平臺就是這樣的拓?fù)洌?/p>

1,使用一臺Nginx代理訪問kibana的請求;

2,兩臺es組成es集群,并且在兩臺es上面都安裝kibana;(以下對elasticsearch簡稱es)

3,中間三臺服務(wù)器就是我的kafka(zookeeper)集群啦; 上面寫的消費者/生產(chǎn)者這是kafka(zookeeper)中的概念;

4,最后面的就是一大堆的生產(chǎn)服務(wù)器啦,上面使用的是logstash,當(dāng)然除了logstash也可以使用其他的工具來收集你的應(yīng)用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……

軟件選用:

elasticsearch-1.7.3.tar.gz #這里需要說明一下,前幾天使用了最新的elasticsearch2.0,java-1.8.0報錯,目前未找到原因,故這里使用1.7.3版本

Logstash-2.0.0.tar.gz

kibana-4.1.2-linux-x64.tar.gz

以上軟件都可以從官網(wǎng)下載:https://www.elastic.co/downloads

java-1.8.0,nginx采用yum安裝

部署步驟:

1.ES集群安裝配置;

2.Logstash客戶端配置(直接寫入數(shù)據(jù)到ES集群,寫入系統(tǒng)messages日志);

3.Kafka(zookeeper)集群配置;(Logstash寫入數(shù)據(jù)到Kafka消息系統(tǒng));

4.Kibana部署;

5.Nginx負(fù)載均衡Kibana請求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana報表基本使用;

ES集群安裝配置;

es1.example.com:

1.安裝java-1.8.0以及依賴包

yum install -y epel-release

yum install -y java-1.8.0 git wget lrzsz

2.獲取es軟件包

wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz

tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local

ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.修改配置文件

[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml

32 cluster.name: es-cluster #組播的名稱地址

40 node.name: "es-node1 " #節(jié)點名稱,不能和其他節(jié)點重復(fù)

47 node.master: true #節(jié)點能否被選舉為master

51 node.data: true #節(jié)點是否存儲數(shù)據(jù)

107 index.number_of_shards: 5 #索引分片的個數(shù)

111 index.number_of_replicas: 1 #分片的副本個數(shù)

145 path.conf: /usr/local/elasticsearch/config/ #配置文件的路徑

149 path.data: /data/es/data #數(shù)據(jù)目錄路徑

159 path.work: /data/es/worker #工作目錄路徑

163 path.logs: /usr/local/elasticsearch/logs/ #日志文件路徑

167 path.plugins: /data/es/plugins #插件路徑

184 bootstrap.mlockall: true #內(nèi)存不向swap交換

232 http.enabled: true #啟用http

4.創(chuàng)建相關(guān)目錄

mkdir /data/es/{data,worker,plugins} -p

5.獲取es服務(wù)管理腳本

[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git

[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/

[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install

Detected RHEL or Fedora:

Installing the Elasticsearch daemon..

[root@es1 ~]#

#這時就會在/etc/init.d/目錄下安裝上es的管理腳本啦

#修改其配置:

[root@es1 ~]#

set.default.ES_HOME=/usr/local/elasticsearch #安裝路徑

set.default.ES_HEAP_SIZE=1024 #jvm內(nèi)存大小,根據(jù)實際環(huán)境調(diào)整即可

6.啟動es ,并檢查其服務(wù)是否正常

[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300

tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1684/java

tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1684/java

訪問http://192.168.2.18:9200/ 如果出現(xiàn)以下提示信息說明安裝配置完成啦,

7.es1節(jié)點好啦,我們直接把目錄復(fù)制到es2

[root@es1 local]# scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/

[root@es2 local]# ln -sv elasticsearch-1.7.3 elasticsearch

[root@es2 local]# elasticsearch/bin/service/elasticsearch install

#es2只需要修改node.name即可,其他都與es1相同配置

8.安裝es的管理插件

es官方提供一個用于管理es的插件,可清晰直觀看到es集群的狀態(tài),以及對集群的操作管理,安裝方法如下:

[root@es1 local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安裝好之后,訪問方式為: http://192.168.2.18:9200/_plugin/head,由于集群中現(xiàn)在暫時沒有數(shù)據(jù),所以顯示為空,此時,es集群的部署完成。

Logstash客戶端安裝配置;

在webserve1上面安裝Logstassh

1.downloads 軟件包 ,這里注意,Logstash是需要依賴java環(huán)境的,所以這里還是需要yum install -y java-1.8.0.

[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz

[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local

[root@webserver1 ~]# cd /usr/local/

[root@webserver1 local]# ln -sv logstash-2.0.0 logstash

[root@webserver1 local]# mkdir logs etc

2.提供logstash管理腳本,其中里面的配置路徑可根據(jù)實際情況修改

#!/bin/bash

#chkconfig: 2345 55 24

#description: logstash service manager

#auto: Maoqiu Guo

FILE='/usr/local/logstash/etc/*.conf' #logstash配置文件

LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config' #指定logstash配置文件的命令

LOCK='/usr/local/logstash/locks' #用鎖文件配合服務(wù)啟動與關(guān)閉

LOGLOG='--log /usr/local/logstash/logs/stdou.log' #日志

START() {

if [ -f $LOCK ];then

echo -e "Logstash is already ?33[32mrunning?33[0m, do nothing."

else

echo -e "Start logstash service.?33[32mdone?33[m"

nohup ${LOGBIN} ${FILE} ${LOGLOG} &

touch $LOCK

fi

}

STOP() {

if [ ! -f $LOCK ];then

echo -e "Logstash is already stop, do nothing."

else

echo -e "Stop logstash serivce ?33[32mdone?33[m"

rm -rf $LOCK

ps -ef | grep logstash | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null

fi

}

STATUS() {

ps aux | grep logstash | grep -v "grep" >/dev/null

if [ -f $LOCK ] && [ $? -eq 0 ]; then

echo -e "Logstash is: ?33[32mrunning?33[0m..."

else

echo -e "Logstash is: ?33[31mstopped?33[0m..."

fi

}

TEST(){

${LOGBIN} ${FILE} --configtest

}

case "$1" in

start)

START

;;

stop)

STOP

;;

status)

STATUS

;;

restart)

STOP

sleep 2

START

;;

test)

TEST

;;

*)

echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"

;;

esac

3.Logstash 向es集群寫數(shù)據(jù)

(1)編寫一個logstash配置文件

[root@webserver1 etc]# cat logstash.conf

input { #數(shù)據(jù)的輸入從標(biāo)準(zhǔn)輸入

stdin {}

}

output { #數(shù)據(jù)的輸出我們指向了es集群

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"] #es主機的ip及端口

}

}

[root@webserver1 etc]#

(2)檢查配置文件是否有語法錯

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[root@webserver1 etc]#

(3)既然配置ok我們手動啟動它,然后寫點東西看能否寫到es

ok.上圖已經(jīng)看到logstash已經(jīng)可以正常的工作啦.

4.下面演示一下如何收集系統(tǒng)日志

將之前的配置文件修改如下所示內(nèi)容,然后啟動logstash服務(wù)就可以在web頁面中看到messages的日志寫入es,并且創(chuàng)建了一條索引

[root@webserver1 etc]# cat logstash.conf

input { #這里的輸入使用的文件,即日志文件messsages

file {

path => "/var/log/messages" #這是日志文件的絕對路徑

start_position => "beginning" #這個表示從messages的第一行讀取,即文件開始處

}

}

output { #輸出到es

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "system-messages-%{+YYYY-MM}" #這里將按照這個索引格式來創(chuàng)建索引

}

}

[root@webserver1 etc]#

啟動logstash后,我們來看head這個插件的web頁面

ok,系統(tǒng)日志我們已經(jīng)成功的收集,并且已經(jīng)寫入到es集群中,那上面的演示是logstash直接將日志寫入到es集群中的,這種場合我覺得如果量不是很大的話直接像上面已將將輸出output定義到es集群即可,如果量大的話需要加上消息隊列來緩解es集群的壓力。前面已經(jīng)提到了我這邊之前使用的是單臺redis作為消息隊列,但是redis不能作為list類型的集群,也就是redis單點的問題沒法解決,所以這里我選用了kafka ;下面就在三臺server上面安裝kafka集群

Kafka集群安裝配置;

在搭建kafka集群時,需要提前安裝zookeeper集群,當(dāng)然kafka已經(jīng)自帶zookeeper程序只需要解壓并且安裝配置就行了

kafka1上面的配置:

1.獲取軟件包.官網(wǎng):http://kafka.apache.org

[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz

[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/

[root@kafka1 ~]# cd /usr/local/

[root@kafka1 local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集群,修改配置文件

[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie

dataDir=/data/zookeeper

clienrtPort=2181

tickTime=2000

initLimit=20

syncLimit=10

server.2=192.168.2.22:2888:3888

server.3=192.168.2.23:2888:3888

server.4=192.168.2.24:2888:3888

#說明:

tickTime: 這個時間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發(fā)送一個心跳。

2888端口:表示的是這個服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;

3888端口:表示的是萬一集群中的 Leader 服務(wù)器掛了,需要一個端口來重新進(jìn)行選舉,選出一個新的 Leader,而這個端口就是用來執(zhí)行選舉時服務(wù)器相互通信的端口。

3.創(chuàng)建zookeeper所需要的目錄

[root@kafka1 ~]# mkdir /data/zookeeper

4.在/data/zookeeper目錄下創(chuàng)建myid文件,里面的內(nèi)容為數(shù)字,用于標(biāo)識主機,如果這個文件沒有的話,zookeeper是沒法啟動的哦

[root@kafka1 ~]# echo 2 > /data/zookeeper/myid

以上就是zookeeper集群的配置,下面等我配置好kafka之后直接復(fù)制到其他兩個節(jié)點即可

5.kafka配置

[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties

broker.id=2 # 唯一,填數(shù)字,本文中分別為2/3/4

prot=9092 # 這個broker監(jiān)聽的端口

host.name=192.168.2.22 # 唯一,填服務(wù)器IP

log.dir=/data/kafka-logs # 該目錄可以不用提前創(chuàng)建,在啟動時自己會創(chuàng)建

zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181 #這個就是zookeeper的ip及端口

num.partitions=16 # 需要配置較大 分片影響讀寫速度

log.dirs=/data/kafka-logs # 數(shù)據(jù)目錄也要單獨配置磁盤較大的地方

log.retention.hours=168 # 時間按需求保留過期時間 避免磁盤滿

6.將kafka(zookeeper)的程序目錄全部拷貝至其他兩個節(jié)點

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改兩個借點的配置,注意這里除了以下兩點不同外,都是相同的配置

(1)zookeeper的配置

mkdir /data/zookeeper

echo "x" > /data/zookeeper/myid

(2)kafka的配置

broker.id=2

host.name=192.168.2.22

8.修改完畢配置之后我們就可以啟動了,這里先要啟動zookeeper集群,才能啟動kafka

我們按照順序來,kafka1 –> kafka2 –>kafka3

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper啟動命令

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper停止的命令

注意,如果zookeeper有問題 nohup的日志文件會非常大,把磁盤占滿,這個zookeeper服務(wù)可以通過自己些服務(wù)腳本來管理服務(wù)的啟動與關(guān)閉。

后面兩臺執(zhí)行相同操作,在啟動過程當(dāng)中會出現(xiàn)以下報錯信息

[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

由于zookeeper集群在啟動的時候,每個結(jié)點都試圖去連接集群中的其它結(jié)點,先啟動的肯定連不上后面還沒啟動的,所以上面日志前面部分的異常是可以忽略的。通過后面部分可以看到,集群在選出一個Leader后,最后穩(wěn)定了。

其他節(jié)點也可能會出現(xiàn)類似的情況,屬于正常。

9.zookeeper服務(wù)檢查

[root@kafka1~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 1959/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1959/java

[root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.23:3888 0.0.0.0:* LISTEN 1723/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1723/java

[root@kafka3 ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 950/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 950/java

tcp 0 0 192.168.2.24:2888 0.0.0.0:* LISTEN 950/java

#可以看出,如果哪臺是Leader,那么它就擁有2888這個端口

ok. 這時候zookeeper集群已經(jīng)啟動起來了,下面啟動kafka,也是依次按照順序啟動

[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka啟動的命令

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-server-stop.sh #kafka停止的命令

注意,跟zookeeper服務(wù)一樣,如果kafka有問題 nohup的日志文件會非常大,把磁盤占滿,這個kafka服務(wù)同樣可以通過自己些服務(wù)腳本來管理服務(wù)的啟動與關(guān)閉。

此時三臺上面的zookeeper及kafka都已經(jīng)啟動完畢,來檢測以下吧

(1)建立一個主題

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超過broker數(shù)

(2)查看有哪些主題已經(jīng)創(chuàng)建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 #列出集群中所有的topic

summer #已經(jīng)創(chuàng)建成功

(3)查看summer這個主題的詳情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer

Topic:summer PartitionCount:1 ReplicationFactor:3 Configs:

Topic: summer Partition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3

#主題名稱:summer

#Partition:只有一個,從0開始

#leader :id為2的broker

#Replicas 副本存在于broker id為2,3,4的上面

#Isr:活躍狀態(tài)的broker

(4)發(fā)送消息,這里使用的是生產(chǎn)者角色

[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer

This is a messages

welcome to kafka

(5)接收消息,這里使用的是消費者角色

[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning

This is a messages

welcome to kafka

如果能夠像上面一樣能夠接收到生產(chǎn)者發(fā)過來的消息,那說明基于kafka的zookeeper集群就成功啦。

10,下面我們將webserver1上面的logstash的輸出改到kafka上面,將數(shù)據(jù)寫入到kafka中

(1)修改webserver1上面的logstash配置,如下所示:各個參數(shù)可以到官網(wǎng)查詢.

root@webserver1 etc]# cat logstash.conf

input { #這里的輸入還是定義的是從日志文件輸入

file {

type => "system-message"

path => "/var/log/messages"

start_position => "beginning"

}

}

output {

#stdout { codec => rubydebug } #這是標(biāo)準(zhǔn)輸出到終端,可以用于調(diào)試看有沒有輸出,注意輸出的方向可以有多個

kafka { #輸出到kafka

bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #他們就是生產(chǎn)者

topic_id => "system-messages" #這個將作為主題的名稱,將會自動創(chuàng)建

compression_type => "snappy" #壓縮類型

}

}

[root@webserver1 etc]#

(2)配置檢測

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[root@webserver1 etc]#

(2)啟動Logstash,這里我直接在命令行執(zhí)行即可

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)驗證數(shù)據(jù)是否寫入到kafka,這里我們檢查是否生成了一個叫system-messages的主題

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181

summer

system-messages #可以看到這個主題已經(jīng)生成了

#再看看這個主題的詳情:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages

Topic:system-messages PartitionCount:16 ReplicationFactor:1 Configs:

Topic: system-messages Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: system-messages Partition: 1 Leader: 3 Replicas: 3 Isr: 3

Topic: system-messages Partition: 2 Leader: 4 Replicas: 4 Isr: 4

Topic: system-messages Partition: 3 Leader: 2 Replicas: 2 Isr: 2

Topic: system-messages Partition: 4 Leader: 3 Replicas: 3 Isr: 3

Topic: system-messages Partition: 5 Leader: 4 Replicas: 4 Isr: 4

Topic: system-messages Partition: 6 Leader: 2 Replicas: 2 Isr: 2

Topic: system-messages Partition: 7 Leader: 3 Replicas: 3 Isr: 3

Topic: system-messages Partition: 8 Leader: 4 Replicas: 4 Isr: 4

Topic: system-messages Partition: 9 Leader: 2 Replicas: 2 Isr: 2

Topic: system-messages Partition: 10 Leader: 3 Replicas: 3 Isr: 3

Topic: system-messages Partition: 11 Leader: 4 Replicas: 4 Isr: 4

Topic: system-messages Partition: 12 Leader: 2 Replicas: 2 Isr: 2

Topic: system-messages Partition: 13 Leader: 3 Replicas: 3 Isr: 3

Topic: system-messages Partition: 14 Leader: 4 Replicas: 4 Isr: 4

Topic: system-messages Partition: 15 Leader: 2 Replicas: 2 Isr: 2

[root@kafka1 ~]#

可以看出,這個主題生成了16個分區(qū),每個分區(qū)都有對應(yīng)自己的Leader,但是我想要有10個分區(qū),3個副本如何辦?還是跟我們上面一樣命令行來創(chuàng)建主題就行,當(dāng)然對于logstash輸出的我們也可以提前先定義主題,然后啟動logstash 直接往定義好的主題寫數(shù)據(jù)就行啦,命令如下:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,我們將logstash收集到的數(shù)據(jù)寫入到了kafka中了,在實驗過程中我使用while腳本測試了如果不斷的往kafka寫數(shù)據(jù)的同時停掉兩個節(jié)點,數(shù)據(jù)寫入沒有任何問題。

那如何將數(shù)據(jù)從kafka中讀取然后給我們的es集群呢?那下面我們在kafka集群上安裝Logstash,安裝步驟不再贅述;三臺上面的logstash 的配置如下,作用是將kafka集群的數(shù)據(jù)讀取然后轉(zhuǎn)交給es集群,這里為了測試我讓他新建一個索引文件,注意這里的輸入日志還是messages,主題名稱還是“system-messages”

[root@kafka1 etc]# more logstash.conf

input {

kafka {

zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #消費者們

topic_id => "system-messages"

codec => plain

reset_beginning => false

consumer_threads => 5

decorate_events => true

}

}

output {

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "test-system-messages-%{+YYYY-MM}" #為了區(qū)分之前實驗,我這里新生成的所以名字為“test-system-messages-%{+YYYY-MM}”

}

}

在三臺kafka上面啟動Logstash,注意我這里是在命令行啟動的;

[root@kafka1 etc]# pwd

/usr/local/logstash/etc

[root@kafka1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[root@kafka2 etc]# pwd

/usr/local/logstash/etc

[root@kafka2 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[root@kafka3 etc]# pwd

/usr/local/logstash/etc

[root@kafka3 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上寫入測試內(nèi)容,即webserver1上面利用message這個文件來測試,我先將其清空,然后啟動

[root@webserver1 etc]# >/var/log/messages

[root@webserver1 etc]# echo "我將通過kafka集群達(dá)到es集群哦^0^" >> /var/log/messages

#啟動logstash,讓其讀取messages中的內(nèi)容

4.Kibana部署;

5.Nginx負(fù)載均衡Kibana請求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana報表基本使用;