IPTV行业桌面数据分析,分析用户行为数据。每天数据量可达3000万。统计用户访问量(PV)、UV(独立用户)、VV(视频播放次数)、
DAU(日活)、WAU(周活)、MAU(月活)、月开机率、点击次数、排行榜数据等等。
架构设计
- client上报数据存入缓存中
- 定时将缓存的字符流刷新到文件,并将文件上传到hdfs
- 通过mq 客户端发送至服务端
- mq服务端监听到hdfs进行处理( 将字节数组反序列化为实体Bean)
- 将其实体Bean写入mongo数据库
- 利用mongoDB聚合函数aggregate()查询(分库分表)
环境部署
HADOOP环境搭建:[Hadoop环境搭建地址]
RabbitMQ环境搭建:[RabbitMQ环境搭建地址]
部分代码
客户端收集数据
@RequestMapping(value = "/singleData/{policyId}") public ResponseData postSingleData(@PathVariable("policyId") String policyId, HttpServletRequest resuest) { ResponseData response = getResponseData(); try { ServletInputStream inputStream = resuest.getInputStream();
StringBuffer sb = new StringBuffer(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8")); String data = null; while ((data = bufferedReader.readLine()) != null) { sb.append(data); sb.append("\n"); } if (sb.length() <= 0) { throw new Exception("请提供提交数据"); } DataStr s = new DataStr(policyId, sb); service.receiveNewData(s); } catch (Exception e) { response.setCode(0); response.setMsg(e.getMessage()); }
return response; }
|
定时将缓存的字符流刷新到文件
private void flushData() throws Exception { Queue<DataStr> dataCache = DataPond.getDataCache(); DataStr dataIte = null; Integer size = dataCache.size(); logger.info("There are [" + size + "] of datas in queue"); while (size > 0 && (dataIte = dataCache.poll()) != null) { String policyId = dataIte.getPolicyId(); Map<String, FileStruct> fileCache = DataPond.getFileCache(); FileStruct fileStruct = fileCache.get(policyId); if (fileStruct == null) { fileStruct = new FileStruct(policyId); fileCache.put(policyId, fileStruct); } fileStruct.write(dataIte.getData().toString()); fileStruct.write("\n"); size--; } }
|
文件上传到hdfs,并通过mq发送
private void uploadFilesAndSendToMQ() throws Exception { Map<String, FileStruct> fileCache = DataPond.getFileCache();
Set<String> keySet = fileCache.keySet(); for (String key : keySet) { FileStruct fs = fileCache.get(key); Boolean shallBeFlush = false; if (fs.getFielSize() >= SystemChangeableConstant.MAX_FILESIZE) { shallBeFlush = true; } if (System.currentTimeMillis() - fs.getLastUpdateTime() >= SystemChangeableConstant.MAX_FILE_NOACTION_TIMESPAN) { shallBeFlush = true; } if (shallBeFlush) { if (!hdfsUtil.isOpened()) { System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.5"); hdfsUtil.open(); } logger.info("File of policy [" + key + "] is full and will send out!"); fs.flush(); try { transferFileToDfs(fs); logger.info("File of policy [" + key + "] send to hdfs success"); } catch (Exception e) { logger.error("File of policy [" + key + "] send to hdfs fail as " + e.getMessage()); } try { sendToMq(fs); logger.info("File of policy [" + key + "] send to MQ success"); } catch (Exception e) { logger.error("File of policy [" + key + "] send to MQ fail as " + e.getMessage()); } fileCache.remove(key); fs.destroy(); } } hdfsUtil.close(); }
|
mq服务端监听到hdfs进行处理
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { logger.info("Router [" + SystemConstant.NEW_FILE_QUEUE + "] received new file"); FileStruct fileStruct = (FileStruct) SerializingUtil.deserialize(body); factory.createNewTask(fileStruct); } }; channel.basicConsume(SystemConstant.NEW_FILE_QUEUE, true, consumer);
|
数据入mongo
for (HashMap aMap : parsedDatas) { aMap.put(DataTable.DEFAUL_TTIMECOLUMN, now);
dataTable.genId(aMap); String tableName = dataTable.getSplitTableName(aMap); Document doc = new Document(aMap);
if (dataTable.hasIdentify()) { mongoConnector.insertSingle(tableName, doc); } else { List<Document> list = tableToDatasMap.get(tableName); if (list == null) { list = new ArrayList<Document>(parsedDatas.size() / 2); tableToDatasMap.put(tableName, list); } list.add(doc); } } for (String key : tableToDatasMap.keySet()) { mongoConnector.insert(key, tableToDatasMap.get(key)); }
|
聚合分析
public List<Map<String, Object>> getExhibitionCount(Date start , Date end ) throws ParseException { List<Map<String, Object>> map = new ArrayList<Map<String, Object>>(); int days = differentDaysByMillisecond(start, end); for (int i = 0 ;i < days + 1 ; i++){ Aggregation aggregation = null; aggregation = Aggregation.newAggregation( Aggregation.group("templateCode","columnId","positionId").sum("duration").as("duration"), Aggregation.project("templateCode","columnId","positionId","duration") ); String tableName = this.getTableName(start); AggregationResults<HashMap> aggregate = mongoTemplate.aggregate(aggregation, tableName, HashMap.class); List<HashMap> mappedResults = aggregate.getMappedResults(); System.out.print("处理后数据:" + mappedResults); for (Map adCountDtoMap:mappedResults) { String json = JsonUtils.obj2Str(adCountDtoMap); AdCountDto adCountDto = JsonUtils.str2Obj(json, AdCountDto.class); adCountDto.setDate(dateFormatCheck.format(new Date())); createIndex(COL_NAME); mongoTemplate.insert(adCountDto,COL_NAME); } start = dataPlus(start ,1); } return map; }
|
展示