设想背景
公司项目数据庞大,每天所产生的日志非常大,想要更好的通过日志查找直接原因,并且实现高可用。本文利用ELK(elasticsearch、logstash、kibana)+KAFKA搭建日志系统。利用SpringBoot+SpringAop进行日志采集。通过kafka发送日志给logstash,logstash将日志写入elasticsearch,然后通过kibana将放在elasticsearch显示出来,并且可以进行对接实现实现实时的数据图形分析。
Log4j日志
利用log4j实现日志记录,是不错的一种方案。但当查找问题时,就需要运维的大力协作。如果日志量很大,查找问题就比较困难。如果要实现高可用的话,就必须实现分布式部署。如果部署N台机子就会产生N个日志目录。如果一个用户出现问题,不方便找到是哪一个出了问题。
日志入数据库
将日志存入数据库虽然避免了找问题的难度,但是也避免不了缺陷。
- log记录日志过量,日积月累,一张表明显不够用,就的采用分表分库
- 连接数据库异常是避免不了的,这就会造成log丢失
ELK(elasticsearch、logstash、kibana)
ELK处理日志方式足以解决现在问题。流程图:
使用elasticsearch来存储日志信息,对一般系统来说可以理解为可以存储无限条数据,因为elasticsearch有良好的扩展性,然后是有一个logstash,可以把理解为数据接口,为elasticsearch对接外面过来的log数据,它对接的渠道,有kafka,有log文件,有redis等等,足够兼容N多log形式,最后还有一个部分就是kibana,它主要用来做数据展现,log那么多数据都存放在elasticsearch中,我们得看看log是什么样子的吧,这个kibana就是为了让我们看log数据的,但还有一个更重要的功能是,可以编辑N种图表形式,什么柱状图,折线图等等,来对log数据进行直观的展现。
EKL分工
- logstash做日志对接,接受应用系统的log,然后将其写入到elasticsearch中,logstash可以支持N种log渠道,kafka渠道写进来的、和log目录对接的方式、也可以对reids中的log数据进行监控读取,等等。
- elasticsearch存储日志数据,方便的扩展特效,可以存储足够多的日志数据。
- kibana则是对存放在elasticsearch中的log数据进行:数据展现、报表展现,并且是实时的。
离线文件下载
运行环境:zookeeper3.3.6+kafka2.1+logstash2.3.4+elasticsearch2.3.3+kibana-4.5.4-windows+jdk8
技术:spring 4.3.3 + kafka2.1 + jdk8
链接:https://pan.baidu.com/s/1boOm0wB 密码:mugq
配置文件配置
Logstash配置
可以配置接入N多种log渠道,现状我配置的只是接入kafka渠道。
配置文件在\logstash-2.3.4\config目录下//数据来源。 topic_id kafka的topic
input {
kafka {
zk_connect => "127.0.0.1:2181"
topic_id => "cw_topic"
}
}
filter {
Only matched data are send to output.
}
//数据存储到哪里。elasticsearch的日志收集项cw_logs
output {
stdout{}
For detail config for elasticsearch as output,
See: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
elasticsearch {
action => "index" #The operation on ES
hosts => "127.0.0.1:9200" #ElasticSearch host, can be array.
index => "cw_logs" #The index to write data to.
}
}
Elasticsearch配置
配置文件在\elasticsearch-2.3.3\config目录下的elasticsearch.yml,可以配置允许访问的IP地址,端口等,但我这里是采取默认配置。
Kibana配置
配置文件在\kibana-4.5.4-windows\config目录下的kibana.yml,可以配置允许访问的IP地址,端口等,但我这里是采取默认配置。
这里有一个需要注意的配置,就是指定访问elasticsearch的地址。我这里是同一台机子做测试,所以也是采取默认值了。 The Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://localhost:9200"
SpringBoot利用Aop拦截日志发送至Kafka
代码结构图:
注意:配置pom.xml,Spinrg-kafka版本需和kafka服务一致 <!--注入kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<!--注入aop-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
部分SpringAop代码/**
* Created by cw on 2017/12/22. good day.
* @Author: Chen Wu
* Blog: http://www.updatecg.xin
* <p>Discription:[后置通知,扫描com.androidmov.adManagement.maker.controllers包及此包下的所有带有controllerLogAnnotation注解的方法]</p>
* @param joinPoint 前置参数
* @param controllerLogAnnotation 自定义注解
*/
"execution(* com.androidmov.adManagement.maker.controllers..*.*(..)) && @annotation(controllerLogAnnotation)")) ((
public void doAfterAdviceController(JoinPoint joinPoint, ControllerLogAnnotation controllerLogAnnotation){
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
// HttpSession session = request.getSession();
//请求IP
String ip = request.getRemoteAddr();
try{
log.info("===========Controller 前置通知开始执行==============");
log.info("请求方法:" + (joinPoint.getTarget().getClass().getName() + "." +joinPoint.getSignature().getClass().getName() + "()"));
log.info("方法描述:" + getMethodDescription(joinPoint,true));
log.info("IP:" + ip);
String value = controllerLogAnnotation.description();
kafkaTemplate.send("mylog_topic", "key_controller", value);
}catch (Exception e){
//记录本地异常日志
log.error("==前置通知异常==");
log.error("异常信息:{}", e.getMessage());
}
}
application.properties中相关kafak配置#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=mylog_topic
kafka.consumer.group.id=mylog_topic
kafka.consumer.concurrency=10
通过kafka发送日志给logstash/**
* Created by cw on 2017/9/27. good day.
*/
public class KafkaProducerConfig {
"${kafka.producer.servers}") (
private String servers;
"${kafka.producer.retries}") (
private int retries;
"${kafka.producer.batch.size}") (
private int batchSize;
"${kafka.producer.linger}") (
private int linger;
"${kafka.producer.buffer.memory}") (
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 记录日志
* 使用elasticsearch来存储日志信息,
* 为elasticsearch对接外面过来的log数据,
* 它对接kafka,
* 最后还有一个部分就是kibana,它主要用来做数据展现。
* @param key
* @param msg
*/
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
ELK,kafka、aop之间的关系
aop对日志进行收集,然后通过kafka发送出去,发送的时候,指定了topic(在spring配置文件中配置为 topic=”cw_topic”)
logstash指定接手topic为 cw_topic的kafka消息(在config目录下的配置文件中,有一个input的配置)
然后logstash还定义了将接收到的kafka消息,写入到索引为my_logs的库中(output中有定义)
再在kibana配置中,指定要连接那个elasticsearch(kibana.yml中有配置,默认为本机)
最后是访问kibana,在kibana的控制台中,设置要访问elasticsearch中的哪个index。
后期补上测试数据
对应代码分享在 [https://github.com/UpdateCw/SpringBoot/tree/20171222]
本文参考文献 [http://www.demodashi.com/demo/10181.html]