前言
企业中,web服务占有很大比重,同时web服务产生大量的日志,而这些日志中包含了很多有用信息。随着业务量的增长,日志的数量也会急剧增加,当日志只有GB单位时,在处理日志时还可以使用grep,sed,awk处理;当数据量上升到百GB以上时,shell脚本,以及python多线程的处理方式就显得有些力不从心。所以使用hadoop处理海量日志分析,是必然趋势。
本次是构建一个web日志的kpi分析系统,通过获取相应key值的分析,获取潜在消息
目录:
- Web日志分析概述
- 需求分析:KPI指标设计
- 算法模型:Hadoop并行算法
- 架构设计:日志KPI系统架构
- 程序开发
Web日志分析概述
Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。
在Web日志中,每条日志通常代表着用户的一次访问行为
1
| 127.0.0.1 - - [19/Jul/2016:04:02:04 +0800] "GET /ss/bower/components/showdown/compressed/showdown.js HTTP/1.1" 304 0 "http://localhost/ss/" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36"
|
web日志,拆解为以下10个变量:
- remote_addr: 记录客户端的ip地址, 127.0.0.1
- remote_user: 记录客户端用户名称, –
- time_local: 记录访问时间与时区, [19/Jul/2016:04:02:04 +0800]
- request_method: 记录请求的url与http协议, GET
- source: /ss/bower/components/showdown/compressed/showdown.js
- vesion: HTTP/1.1
- status: 记录请求状态,304
- body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
- http_referer: 用来记录从那个页面链接访问过来的, http://localhost/ss/
- http_user_agent: 记录客户浏览器的相关信息, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.94 Safari/537.36
需求分析:KPI指标设计
KPI指标设计:
* PV(PageView): 网站文件访问统计
* IP: 独立IP的访问量统计
* Time: 网站分钟级请求数统计
* SIP:用户来源域名的统计
* Browser: 用户的访问设备统计
* Status:状态码分析
算法模型:Hadoop并行算法
并行算法的设计:
- PV(PageView): 网站文件访问统计
- Map过程{key:$source,value:1}
- Reduce过程{key:$source,value:求和(sum)}
- IP: IP的访问量统计
- Map: {key:$remote_addr,value:$1}
- Reduce: {key:$remote_addr,value:求和(sum}
- Time: 网站分钟级请求数统计
- Map: {key:$time_local,value:1}
- Reduce: {key:$time_local,value:求和(sum)}
- SIP: 用户来源域名的统计
- Map: {key:$http_referer,value:1}
- Reduce: {key:$http_referer,value:求和(sum)}
- Browser: 用户的访问设备统计
- Map: {key:$http_user_agent,value:1}
- Reduce: {key:$http_user_agent,value:求和(sum)}
- Status:状态码分析
- Map: {key:$status,value:1}
- Reduce: {key:$status,value:求和(sum)}
架构设计:日志KPI系统架构
- 日志集中保存
- 设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
- 完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
- 完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。
程序开发
- 集中日志 crontab
- MapReduce程序实现开发流程:
- 对日志行的解析
- Map函数实现
- Reduce函数实现
日志集中
crontab,内容如下:
59 23 * * * /usr/bin/python /hadoop/logserver/hdfsput.py >> /dev/null 2>&1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import subprocess import sys import datatime webid="****" currdate=datetime.datetime.now().strftime('%Y%m%d') logspath="/data/logs/"+currdate+"/access.log" logname="access.log."+webid try: subprocess.Popen(["/usr/local/hadoop-1.2.1/bin/hadoop", "dfs", "-mkdir", "hdfs://192.168.1.20:9000/user/root/website.com/"+currdate], stdout=subprocess.PIPE) except Exception,e: pass putinfo=subprocess.Popen(["/usr/local/hadoop-1.2.1/bin/hadoop", "dfs", "-put", logspath, "hdfs://192.168.1.20:9000/user/root/website.com/"+currdate+"/"+logname], stdout=subprocess.PIPE) for line in putinfo.stdout: print line
|
统计分析
统计分析语言采用python,使用Mrjob框架编写Mapreduce。避免臃肿的代码量。
PV(PageView): 网站文件访问统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i=0 for url in line.split(): if i==6: yield url, 1 i+=1 def reducer(self, url, occurrences): yield url, sum(occurrences) if __name__ == '__main__': MRCounter.run()
|
IP: 独立IP的访问量统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from mrjob.job import MRJob import re IP_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}") class MRCounter(MRJob): def mapper(self, key, line): for ip in IP_RE.findall(line): yield ip, 1 def reducer(self, ip, occurrences): yield ip, sum(occurrences) if __name__ == '__main__': MRCounter.run()
|
Time: 网站分钟级请求数统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i=0 for dt in line.split(): if i==3: timerow=dt.split(":") hm=timerow[1]+":"+timerow[2] yield hm, 1 i+=1 def reducer(self, key, occurrences): yield key, sum(occurrences) if __name__ == '__main__': MRCounter.run()
|
SIP:用户来源域名的统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i=0 for dt in line.split(): if i==11: timerow=dt.split(":") yield sip, 1 i+=1 def reducer(self, key, occurrences): yield key, sum(occurrences) if __name__ == '__main__': MRCounter.run()
|
Browser: 用户的访问设备统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i=0 for dt in line.split(): if i==12: timerow=dt.split(":") yield browser , 1 i+=1 def reducer(self, key, occurrences): yield key, sum(occurrences) if __name__ == '__main__': MRCounter.run()
|
Status:状态码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i=0 for httpcode in line.split(): if i==8 and re.match(r"\d{1,3}",httpcode): yield httpcode, 1 i+=1 def reducer(self, httpcode, occurrences): yield httpcode, sum(occurrences) def reducer_sorted(self, httpcode, occurrences): yield httpcode, sorted(occurrences) def steps(self): return [self.mr(mapper=self.mapper), self.mr(reducer=self.reducer)] if __name__ == '__main__': MRCounter.run()
|
未完待续