ELK

TIP

ELK 是 elastic 公司旗下三款产品ElasticSearch、Logstash、Kibana的首字母组合,也即Elastic Stack包含ElasticSearch、Logstash、Kibana、Beats。ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用,是目前主流的一种日志系统。
本主演示如何用elk收集nginx日志,spirngboot日志以及mysql数据。

安装相关软件

本文安装所需要软件如下 ,elk版本为6.4.3

image

收集springboot日志

在 pom.xml中加入以下几个依赖包

<!-- 日志同步至kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>5.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.1.0</version>
        </dependency>

编写logback-spring.xml日志配置文件如下

<?xml version="1.0" encoding="UTF-8"?>
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。 scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,
    默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。 debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。
    默认值为false。 -->
<!-- <configuration scan="false" scanPeriod="60 seconds" debug="false"> -->
<configuration>
  <!--设置上下文名称,用于区分不同应用程序的记录。一旦设置不能修改, 可以通过%contextName来打印日志上下文名称 -->
  <contextName>pit_biz</contextName>
  <!-- 定义日志的根目录 -->
  <property name="logDir" value="logs"/>
  <!-- 定义日志文件名称 -->
  <property name="logName" value="personalBizLog"></property>
  <!-- ConsoleAppender 表示控制台输出 -->
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <!-- 日志输出格式: %d表示日期时间, %thread表示线程名, %-5level:级别从左显示5个字符宽度, %logger{50}
        表示logger名字最长50个字符,否则按照句点分割。 %msg:日志消息, %n是换行符 -->
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>
  <!-- 异常错误日志记录到文件  -->
  <appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <!-- <Encoding>UTF-8</Encoding> -->
    <File>${logDir}/${logName}.log</File>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <FileNamePattern>${logDir}/error/${error_log}.%d{yyyy-MM-dd}.rar
      </FileNamePattern>
      <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>
  <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
      <layout class="net.logstash.logback.layout.LogstashLayout">
        <includeContext>false</includeContext>
        <includeCallerData>true</includeCallerData>
        <customFields>{"system":"pit_biz"}</customFields>
        <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
      </layout>
      <charset>UTF-8</charset>
    </encoder>
    <!--kafka topic 需要与配置文件里面的topic一致 否则kafka会沉默并鄙视你-->
    <topic>pit-log</topic>
    <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy"/>
    <deliveryStrategy
      class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
          <!--kafka 服务器地址-->
    <producerConfig>bootstrap.servers=192.168.17.150:9092</producerConfig>
  </appender>
  <root level="INFO">
    <appender-ref ref="CONSOLE"/>
    <appender-ref ref="logfile"/>
    <appender-ref ref="kafkaAppender"/>
  </root>
</configuration>

新增logstash配置文件pitlog.conf

input {
	kafka{
        bootstrap_servers => ["192.168.17.150:9092"]
		
        topics => ["pit-log"] #数组类型,可配置多个topic
        type => "kafka" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
		codec => "json"
      }
}
output {
    elasticsearch {
		#ES地址
        hosts => ["192.168.17.150:9200"]		
        #指定索引名字,不适用默认的,用来区分各个项目
        index => "pit-log-%{+YYYY.MM.dd}"
      	
    }
	stdout {}
}

启动zookeeper,kafka,es,kibana,logstash,最后启动自己的SpringBoot项目

zkServer.sh start
nohup /home/local/kafka_2.12-2.3.0/bin/kafka-server-start.sh /home/local/kafka_2.12-2.3.0/config/server.properties > /dev/null 2>&1
sh /home/local/elasticsearch-6.4.3/bin/elasticsearch -d -p pid
sh /home/local/kibana-6.4.3-linux-x86_64/bin/kibana
sh /home/local/logstash-6.4.3/bin/logstash -f /home/local/logstash-6.4.3/myconf/pitlog.conf

查看成功运行结果

image

收集nginx日志

TIP

还没写。。。。

收集mysql日志

新增logstash配置文件maxwell.conf

input {
	kafka{
        bootstrap_servers => ["192.168.17.150:9092"]
		
        topics => ["maxwell"] #数组类型,可配置多个topic
        type => "kafka" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
		codec => "json"
      }
}
output {
    elasticsearch {
		#ES地址
        hosts => ["192.168.17.150:9200"]		
        #指定索引名字,不适用默认的,用来区分各个项目
        #index => "invo-log-%{+YYYY.MM.dd}"
      	index => "maxwell"
    }
	stdout {}
}

启动mysql,zookeeper,kafka,es,kibana,logstash,maxwell

zkServer.sh start
nohup /home/local/kafka_2.12-2.3.0/bin/kafka-server-start.sh /home/local/kafka_2.12-2.3.0/config/server.properties > /dev/null 2>&1
sh /home/local/elasticsearch-6.4.3/bin/elasticsearch -d -p pid
sh /home/local/kibana-6.4.3-linux-x86_64/bin/kibana
sh /home/local/logstash-6.4.3/bin/logstash -f /home/local/logstash-6.4.3/myconf/maxwell.conf
bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.17.150'  --producer=kafka --kafka.bootstrap.servers=192.168.17.150:9092 --kafka_topic=maxwell

查看成功运行结果

image