基于hadoop的web日志分析

前言

企业中,web服务占有很大比重,同时web服务产生大量的日志,而这些日志中包含了很多有用信息。随着业务量的增长,日志的数量也会急剧增加,当日志只有GB单位时,在处理日志时还可以使用grep,sed,awk处理;当数据量上升到百GB以上时,shell脚本,以及python多线程的处理方式就显得有些力不从心。所以使用hadoop处理海量日志分析,是必然趋势。
本次是构建一个web日志的kpi分析系统,通过获取相应key值的分析,获取潜在消息

目录:

  • Web日志分析概述
  • 需求分析:KPI指标设计
  • 算法模型:Hadoop并行算法
  • 架构设计:日志KPI系统架构
  • 程序开发

Web日志分析概述

Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。
在Web日志中,每条日志通常代表着用户的一次访问行为

1
127.0.0.1 - - [19/Jul/2016:04:02:04 +0800] "GET /ss/bower/components/showdown/compressed/showdown.js HTTP/1.1" 304 0 "http://localhost/ss/" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36"

web日志,拆解为以下10个变量:

  • remote_addr: 记录客户端的ip地址, 127.0.0.1
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [19/Jul/2016:04:02:04 +0800]
  • request_method: 记录请求的url与http协议, GET
  • source: /ss/bower/components/showdown/compressed/showdown.js
  • vesion: HTTP/1.1
  • status: 记录请求状态,304
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, http://localhost/ss/
  • http_user_agent: 记录客户浏览器的相关信息, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36

需求分析:KPI指标设计

KPI指标设计:

* PV(PageView): 网站文件访问统计
* IP: 独立IP的访问量统计
* Time: 网站分钟级请求数统计
* SIP:用户来源域名的统计   
* Browser: 用户的访问设备统计   
* Status:状态码分析

算法模型:Hadoop并行算法

并行算法的设计:

  • PV(PageView): 网站文件访问统计
    • Map过程{key:$source,value:1}
    • Reduce过程{key:$source,value:求和(sum)}
  • IP: IP的访问量统计
    • Map: {key:$remote_addr,value:$1}
    • Reduce: {key:$remote_addr,value:求和(sum}
  • Time: 网站分钟级请求数统计
    • Map: {key:$time_local,value:1}
    • Reduce: {key:$time_local,value:求和(sum)}
  • SIP: 用户来源域名的统计
    • Map: {key:$http_referer,value:1}
    • Reduce: {key:$http_referer,value:求和(sum)}
  • Browser: 用户的访问设备统计
    • Map: {key:$http_user_agent,value:1}
    • Reduce: {key:$http_user_agent,value:求和(sum)}
  • Status:状态码分析
    • Map: {key:$status,value:1}
    • Reduce: {key:$status,value:求和(sum)}

架构设计:日志KPI系统架构

  • 日志集中保存
  • 设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
  • 完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
  • 完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。

程序开发

  • 集中日志 crontab
  • MapReduce程序实现开发流程:
    1. 对日志行的解析
    2. Map函数实现
    3. Reduce函数实现

日志集中

crontab,内容如下:
59 23 * * * /usr/bin/python /hadoop/logserver/hdfsput.py >> /dev/null 2>&1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import subprocess
import sys
import datatime
webid="****"
currdate=datetime.datetime.now().strftime('%Y%m%d')
logspath="/data/logs/"+currdate+"/access.log"
logname="access.log."+webid
try:
subprocess.Popen(["/usr/local/hadoop-1.2.1/bin/hadoop", "dfs", "-mkdir", "hdfs://192.168.1.20:9000/user/root/website.com/"+currdate], stdout=subprocess.PIPE)
except Exception,e:
pass
putinfo=subprocess.Popen(["/usr/local/hadoop-1.2.1/bin/hadoop", "dfs", "-put", logspath, "hdfs://192.168.1.20:9000/user/root/website.com/"+currdate+"/"+logname], stdout=subprocess.PIPE)
for line in putinfo.stdout:
print line

统计分析

统计分析语言采用python,使用Mrjob框架编写Mapreduce。避免臃肿的代码量。

PV(PageView): 网站文件访问统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from mrjob.job import MRJob
import re
class MRCounter(MRJob):
def mapper(self, key, line):
i=0
for url in line.split():
if i==6:
yield url, 1
i+=1
def reducer(self, url, occurrences):
yield url, sum(occurrences)
if __name__ == '__main__':
MRCounter.run()

IP: 独立IP的访问量统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from mrjob.job import MRJob
import re
IP_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
class MRCounter(MRJob):
def mapper(self, key, line):
for ip in IP_RE.findall(line):
yield ip, 1
def reducer(self, ip, occurrences):
yield ip, sum(occurrences)
if __name__ == '__main__':
MRCounter.run()

Time: 网站分钟级请求数统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from mrjob.job import MRJob
import re
class MRCounter(MRJob):
def mapper(self, key, line):
i=0
for dt in line.split():
if i==3:
timerow=dt.split(":")
hm=timerow[1]+":"+timerow[2]
yield hm, 1
i+=1
def reducer(self, key, occurrences):
yield key, sum(occurrences)
if __name__ == '__main__':
MRCounter.run()

SIP:用户来源域名的统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from mrjob.job import MRJob
import re
class MRCounter(MRJob):
def mapper(self, key, line):
i=0
for dt in line.split():
if i==11:
timerow=dt.split(":")
yield sip, 1
i+=1
def reducer(self, key, occurrences):
yield key, sum(occurrences)
if __name__ == '__main__':
MRCounter.run()

Browser: 用户的访问设备统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from mrjob.job import MRJob
import re
class MRCounter(MRJob):
def mapper(self, key, line):
i=0
for dt in line.split():
if i==12:
timerow=dt.split(":")
yield browser , 1
i+=1
def reducer(self, key, occurrences):
yield key, sum(occurrences)
if __name__ == '__main__':
MRCounter.run()

Status:状态码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from mrjob.job import MRJob
import re
class MRCounter(MRJob):
def mapper(self, key, line):
i=0
for httpcode in line.split():
if i==8 and re.match(r"\d{1,3}",httpcode):
yield httpcode, 1
i+=1
def reducer(self, httpcode, occurrences):
yield httpcode, sum(occurrences)
def reducer_sorted(self, httpcode, occurrences):
yield httpcode, sorted(occurrences)
def steps(self):
return [self.mr(mapper=self.mapper),
self.mr(reducer=self.reducer)]
if __name__ == '__main__':
MRCounter.run()

未完待续