SpringBoot下ELK+KAFKA+SpringAop日志分析

设想背景

公司项目数据庞大,每天所产生的日志非常大,想要更好的通过日志查找直接原因,并且实现高可用。本文利用ELK(elasticsearch、logstash、kibana)+KAFKA搭建日志系统。利用SpringBoot+SpringAop进行日志采集。通过kafka发送日志给logstash,logstash将日志写入elasticsearch,然后通过kibana将放在elasticsearch显示出来,并且可以进行对接实现实现实时的数据图形分析。

Log4j日志

利用log4j实现日志记录,是不错的一种方案。但当查找问题时,就需要运维的大力协作。如果日志量很大,查找问题就比较困难。如果要实现高可用的话,就必须实现分布式部署。如果部署N台机子就会产生N个日志目录。如果一个用户出现问题,不方便找到是哪一个出了问题。

日志入数据库

将日志存入数据库虽然避免了找问题的难度,但是也避免不了缺陷。

  • log记录日志过量,日积月累,一张表明显不够用,就的采用分表分库
  • 连接数据库异常是避免不了的,这就会造成log丢失

ELK(elasticsearch、logstash、kibana)

ELK处理日志方式足以解决现在问题。流程图:
jdk

使用elasticsearch来存储日志信息,对一般系统来说可以理解为可以存储无限条数据,因为elasticsearch有良好的扩展性,然后是有一个logstash,可以把理解为数据接口,为elasticsearch对接外面过来的log数据,它对接的渠道,有kafka,有log文件,有redis等等,足够兼容N多log形式,最后还有一个部分就是kibana,它主要用来做数据展现,log那么多数据都存放在elasticsearch中,我们得看看log是什么样子的吧,这个kibana就是为了让我们看log数据的,但还有一个更重要的功能是,可以编辑N种图表形式,什么柱状图,折线图等等,来对log数据进行直观的展现。

EKL分工

  • logstash做日志对接,接受应用系统的log,然后将其写入到elasticsearch中,logstash可以支持N种log渠道,kafka渠道写进来的、和log目录对接的方式、也可以对reids中的log数据进行监控读取,等等。
  • elasticsearch存储日志数据,方便的扩展特效,可以存储足够多的日志数据。
  • kibana则是对存放在elasticsearch中的log数据进行:数据展现、报表展现,并且是实时的。

离线文件下载

运行环境:zookeeper3.3.6+kafka2.1+logstash2.3.4+elasticsearch2.3.3+kibana-4.5.4-windows+jdk8
技术:spring 4.3.3 + kafka2.1 + jdk8
链接:https://pan.baidu.com/s/1boOm0wB 密码:mugq

配置文件配置

Logstash配置

可以配置接入N多种log渠道,现状我配置的只是接入kafka渠道。
配置文件在\logstash-2.3.4\config目录下

//数据来源。 topic_id kafka的topic
input {
kafka {
zk_connect => "127.0.0.1:2181"
topic_id => "cw_topic"
}
}
filter {
#Only matched data are send to output.
}
//数据存储到哪里。elasticsearch的日志收集项cw_logs
output {
#stdout{}
# For detail config for elasticsearch as output,
# See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
elasticsearch {
action => "index" #The operation on ES
hosts => "127.0.0.1:9200" #ElasticSearch host, can be array.
index => "cw_logs" #The index to write data to.
}
}

Elasticsearch配置

配置文件在\elasticsearch-2.3.3\config目录下的elasticsearch.yml,可以配置允许访问的IP地址,端口等,但我这里是采取默认配置。

Kibana配置

配置文件在\kibana-4.5.4-windows\config目录下的kibana.yml,可以配置允许访问的IP地址,端口等,但我这里是采取默认配置。

这里有一个需要注意的配置,就是指定访问elasticsearch的地址。我这里是同一台机子做测试,所以也是采取默认值了。

# The Elasticsearch instance to use for all your queries.
# elasticsearch.url: "http://localhost:9200"

SpringBoot利用Aop拦截日志发送至Kafka

代码结构图:
jdk

注意:配置pom.xml,Spinrg-kafka版本需和kafka服务一致

 <!--注入kafka-->  
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<!--注入aop-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

部分SpringAop代码

/**
* Created by cw on 2017/12/22. good day.
* @Author: Chen Wu
* Blog: http://www.updatecg.xin
* <p>Discription:[后置通知,扫描com.androidmov.adManagement.maker.controllers包及此包下的所有带有controllerLogAnnotation注解的方法]</p>
* @param joinPoint 前置参数
* @param controllerLogAnnotation 自定义注解
*/
@After(("execution(* com.androidmov.adManagement.maker.controllers..*.*(..)) && @annotation(controllerLogAnnotation)"))
public void doAfterAdviceController(JoinPoint joinPoint, ControllerLogAnnotation controllerLogAnnotation){
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
// HttpSession session = request.getSession();
//请求IP
String ip = request.getRemoteAddr();
try{
log.info("===========Controller 前置通知开始执行==============");
log.info("请求方法:" + (joinPoint.getTarget().getClass().getName() + "." +joinPoint.getSignature().getClass().getName() + "()"));
log.info("方法描述:" + getMethodDescription(joinPoint,true));
log.info("IP:" + ip);
String value = controllerLogAnnotation.description();
kafkaTemplate.send("mylog_topic", "key_controller", value);
}catch (Exception e){
//记录本地异常日志
log.error("==前置通知异常==");
log.error("异常信息:{}", e.getMessage());
}
}

application.properties中相关kafak配置

#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=mylog_topic
kafka.consumer.group.id=mylog_topic
kafka.consumer.concurrency=10

通过kafka发送日志给logstash

/**
* Created by cw on 2017/9/27. good day.
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;


public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

/**
* 记录日志
* 使用elasticsearch来存储日志信息,
* 为elasticsearch对接外面过来的log数据,
* 它对接kafka,
* 最后还有一个部分就是kibana,它主要用来做数据展现。
* @param key
* @param msg
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}

ELK,kafka、aop之间的关系

  • aop对日志进行收集,然后通过kafka发送出去,发送的时候,指定了topic(在spring配置文件中配置为 topic=”cw_topic”)

  • logstash指定接手topic为 cw_topic的kafka消息(在config目录下的配置文件中,有一个input的配置)

  • 然后logstash还定义了将接收到的kafka消息,写入到索引为my_logs的库中(output中有定义)

  • 再在kibana配置中,指定要连接那个elasticsearch(kibana.yml中有配置,默认为本机)

  • 最后是访问kibana,在kibana的控制台中,设置要访问elasticsearch中的哪个index。

后期补上测试数据

对应代码分享在 [https://github.com/UpdateCw/SpringBoot/tree/20171222]
本文参考文献 [http://www.demodashi.com/demo/10181.html]

文章作者: 陈 武
文章链接: http://www.updatecg.xin/2017/09/25/SpringBoot下ELK+KAFKA+SpringAop日志分析/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 我的学习记录
打赏
  • 微信
  • 支付寶

评论