利用HDFS、RabbitMQ、MongoDB实现统计

IPTV行业桌面数据分析,分析用户行为数据。每天数据量可达3000万。统计用户访问量(PV)、UV(独立用户)、VV(视频播放次数)、
DAU(日活)、WAU(周活)、MAU(月活)、月开机率、点击次数、排行榜数据等等。

架构设计

  • client上报数据存入缓存中
  • 定时将缓存的字符流刷新到文件,并将文件上传到hdfs
  • 通过mq 客户端发送至服务端
  • mq服务端监听到hdfs进行处理( 将字节数组反序列化为实体Bean)
  • 将其实体Bean写入mongo数据库
  • 利用mongoDB聚合函数aggregate()查询(分库分表)

环境部署

HADOOP环境搭建:[Hadoop环境搭建地址]
RabbitMQ环境搭建:[RabbitMQ环境搭建地址]

部分代码

客户端收集数据

@RequestMapping(value = "/singleData/{policyId}")
public ResponseData postSingleData(@PathVariable("policyId") String policyId, HttpServletRequest resuest) {
ResponseData response = getResponseData();
try {
ServletInputStream inputStream = resuest.getInputStream();

StringBuffer sb = new StringBuffer();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
String data = null;
while ((data = bufferedReader.readLine()) != null) {
sb.append(data);
sb.append("\n");
}
if (sb.length() <= 0) {
throw new Exception("请提供提交数据");
}
DataStr s = new DataStr(policyId, sb);
service.receiveNewData(s);
} catch (Exception e) {
response.setCode(0);
response.setMsg(e.getMessage());
}

return response;
}

定时将缓存的字符流刷新到文件

private void flushData() throws Exception {
Queue<DataStr> dataCache = DataPond.getDataCache();
DataStr dataIte = null;
Integer size = dataCache.size();
logger.info("There are [" + size + "] of datas in queue");
while (size > 0 && (dataIte = dataCache.poll()) != null) {
String policyId = dataIte.getPolicyId();
Map<String, FileStruct> fileCache = DataPond.getFileCache();
FileStruct fileStruct = fileCache.get(policyId);
if (fileStruct == null) {
fileStruct = new FileStruct(policyId);
fileCache.put(policyId, fileStruct);
}
fileStruct.write(dataIte.getData().toString());
fileStruct.write("\n");
size--;
}
}

文件上传到hdfs,并通过mq发送

private void uploadFilesAndSendToMQ() throws Exception {
/* 遍历文件 */
Map<String, FileStruct> fileCache = DataPond.getFileCache();

Set<String> keySet = fileCache.keySet();
for (String key : keySet) {
FileStruct fs = fileCache.get(key);
/* 标记是否可以被flush,并上传hdfs */
Boolean shallBeFlush = false;
if (fs.getFielSize() >= SystemChangeableConstant.MAX_FILESIZE) {
shallBeFlush = true;
}
if (System.currentTimeMillis() - fs.getLastUpdateTime() >= SystemChangeableConstant.MAX_FILE_NOACTION_TIMESPAN) {
shallBeFlush = true;
}
if (shallBeFlush) {
if (!hdfsUtil.isOpened()) {
//TODO 临时获取hadoop环境变量
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.5");
hdfsUtil.open();
}
logger.info("File of policy [" + key + "] is full and will send out!");
fs.flush();
try {
transferFileToDfs(fs);
logger.info("File of policy [" + key + "] send to hdfs success");
} catch (Exception e) {
logger.error("File of policy [" + key + "] send to hdfs fail as " + e.getMessage());
}
try {
sendToMq(fs);
logger.info("File of policy [" + key + "] send to MQ success");
} catch (Exception e) {
logger.error("File of policy [" + key + "] send to MQ fail as " + e.getMessage());
}
fileCache.remove(key);
fs.destroy();
}
}
hdfsUtil.close();
}

mq服务端监听到hdfs进行处理

/* 提取command放入线程对象 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.info("Router [" + SystemConstant.NEW_FILE_QUEUE + "] received new file");
FileStruct fileStruct = (FileStruct) SerializingUtil.deserialize(body);
factory.createNewTask(fileStruct);
}
};
channel.basicConsume(SystemConstant.NEW_FILE_QUEUE, true, consumer);

数据入mongo

 for (HashMap aMap : parsedDatas) {
// 预置的默认时间列
aMap.put(DataTable.DEFAUL_TTIMECOLUMN, now);

dataTable.genId(aMap);
String tableName = dataTable.getSplitTableName(aMap);
Document doc = new Document(aMap);

if (dataTable.hasIdentify()) {
mongoConnector.insertSingle(tableName, doc);
} else {
List<Document> list = tableToDatasMap.get(tableName);
if (list == null) {
list = new ArrayList<Document>(parsedDatas.size() / 2);
tableToDatasMap.put(tableName, list);
}
list.add(doc);
}
}
for (String key : tableToDatasMap.keySet()) {
mongoConnector.insert(key, tableToDatasMap.get(key));
}

聚合分析

public List<Map<String, Object>> getExhibitionCount(Date start , Date end ) throws ParseException {
List<Map<String, Object>> map = new ArrayList<Map<String, Object>>();
//获取区间日期天数
int days = differentDaysByMillisecond(start, end);
for (int i = 0 ;i < days + 1 ; i++){
Aggregation aggregation = null;
aggregation = Aggregation.newAggregation(
Aggregation.group("templateCode","columnId","positionId").sum("duration").as("duration"),
Aggregation.project("templateCode","columnId","positionId","duration")
);
String tableName = this.getTableName(start);
AggregationResults<HashMap> aggregate = mongoTemplate.aggregate(aggregation, tableName, HashMap.class);
//key 日期 value 当天数据
List<HashMap> mappedResults = aggregate.getMappedResults();
System.out.print("处理后数据:" + mappedResults);
for (Map adCountDtoMap:mappedResults) {
String json = JsonUtils.obj2Str(adCountDtoMap);
AdCountDto adCountDto = JsonUtils.str2Obj(json, AdCountDto.class);
adCountDto.setDate(dateFormatCheck.format(new Date()));
createIndex(COL_NAME);
mongoTemplate.insert(adCountDto,COL_NAME);
}
start = dataPlus(start ,1);
}
return map;
}

展示

文章作者: 陈 武
文章链接: http://www.updatecg.xin/2018/06/20/利用HDFS、RabbitMQ、MongoDB实现统计/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 我的学习记录
打赏
  • 微信
  • 支付寶

评论