区块链技术博客
www.b2bchain.cn

实时数仓——数据采集

这篇文章主要介绍了实时数仓——数据采集的讲解,通过具体代码实例进行18036 讲解,并且分析了实时数仓——数据采集的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=18036

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

文章目录

  • 一、实时需求概述
    • 1.1 实时需求与离线需求的比较
      • 离线需求
      • 实时需求
    • 1.2 实时需求和离线需求的对比
      • 1.1.1 离线架构
      • 1.1.2 实时架构
  • 二、模拟日志数据
    • 2.1 模拟数据需要用到的文件列表
      • 2.1.1 修改application.properties
      • 2.1.2 修改logback.xml
        • 落盘工具使用logback, 类似于log4j
    • 2.2 生成模拟数据
    • 2.3 创建日志服务器
      • 2.3.1 在project中创建springboot子模块
      • 2.3.2 创建controller
      • 2.3.3 把日志落盘与写入kafka中
    • 2.4 将日志服务器打包上传到linux的hadoop162上
    • 2.5 部署Nginx
      • 步骤1: 使用 yum 安装依赖包
      • 步骤2:下载 Nginx
      • 步骤3: 解压到当前目录
      • 步骤4:编译和安装
      • 步骤5: 启动 Nginx
    • 2.6 配置负载均衡
      • 2.6.1 打开nginx配置文件
      • 2.6.2 修改如下配置
    • 2.7 分发日志服务器
  • 三、启动将日志生成到kafka
    • 3.1 启动zookeeper,kafka
    • 3.2 启动hadoop162上的kafka开始消费数据
    • 3.3 启动三台机器上的日志服务器
    • 3.4 启动Nginx
    • 3.5 启动模拟日志数据
  • 四、总结
  • 五、脚本

一、实时需求概述

1.1 实时需求与离线需求的比较

  • 离线需求

    一般是根据前一日的数据生成报表等数据,虽然统计指标、报表繁多,但是对时效性不敏感。 
  • 实时需求

    主要侧重于对当日数据的实时监控,通常业务逻辑相对离线需求简单一下,统计指标也少一些,但是更注重数据的时效性,以及用户的交互性。 

1.2 实时需求和离线需求的对比

1.1.1 离线架构

http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/1604799733.png

实时数仓——数据采集

1.1.2 实时架构

http://lizhenchao.oss-cn-shenzhen.aliyuncs.com/1604799760.png

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jkC7C3Gi-1605026483650)(https://i.loli.net/2020/11/11/13SDLdjMbJXiaeK.jpg)]

二、模拟日志数据

2.1 模拟数据需要用到的文件列表

2.1.1 修改application.properties

# 外部配置打开 logging.config=./logback.xml #实时数据的日期 mock.date=2020-09-10  #实时数据必须是 http, 将来会持续不同的生成数据 mock.type=http #http模式下,发送的地址.  日志服务器的地址 #日志服务器在哪就写哪的地址 mock.url=http://hadoop162/applog #http   协议 #Localhost  主机 #8081   端口号 #applog  路径  【controller负责管路劲】 #启动次数 mock.startup.count=10000 #设备最大值 mock.max.mid=50 #会员最大值 mock.max.uid=500 #商品最大值 mock.max.sku-id=100 #页面平均访问时间 mock.page.during-time-ms=20000 #错误概率 百分比 mock.error.rate=3 #每条日志发送延迟 ms mock.log.sleep=500 #商品详情来源  用户查询,商品推广,智能推荐, 促销活动 mock.detail.source-type-rate=40:25:15:20 

2.1.2 修改logback.xml

落盘工具使用logback, 类似于log4j

<?xml version="1.0" encoding="UTF-8"?> <configuration>     <!--日志的根目录, 根据需要更改成日志要保存的目录-->     <!--日志的根目录, 如果是本地就写本地地址,如果是linux就写linux的地址-->     <property name="LOG_HOME" value="/home/atguigu"/>     <appender name="console" class="ch.qos.logback.core.ConsoleAppender">         <encoder>             <pattern>%msg%n</pattern>         </encoder>     </appender>      <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">         <file>${LOG_HOME}/app0621.log</file>             <!--这个是日志服务器的主类名-->         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">             <fileNamePattern>${LOG_HOME}/app0621.%d{yyyy-MM-dd}.log</fileNamePattern>         </rollingPolicy>         <encoder>             <pattern>%msg%n</pattern>         </encoder>     </appender>       <!-- 将某一个包下日志单独打印日志  需要更换我们的 Controller 类 -->     <logger name="com.atguigu.gmall.gmalllogger.controll.LoggerController"             level="info" additivity="true">         <appender-ref ref="rollingFile"/>         <appender-ref ref="console"/>     </logger>      <root level="error" additivity="true">         <appender-ref ref="console"/>     </root> </configuration>  

2.2 生成模拟数据

cd /opt/software/mock/mock_log java -jar gmall2020-mock-log-2020-05-10.jar 

2.3 创建日志服务器

2.3.1 在project中创建springboot子模块

2.3.2 创建controller

2.3.3 把日志落盘与写入kafka中

package com.atguigu.gmall.gmalllogger.controll;  import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*;  //@Controller //@ResponseBody @RestController @Slf4j public class LoggerController {      @PostMapping("/applog")     public String applog(@RequestBody String logString){         //1.把数据落盘,给离线需求用         saveToDisk(logString);         //2.把数据直接发送到kafka         sendTokafka(logString);         return "ok";     }      /**      * 把日志数据发送到kafka      * @param logString      */     @Autowired     KafkaTemplate<String,String> kafka;     private void sendTokafka(String logString) {         JSONObject obj = JSON.parseObject(logString);         //把事件日志和启动日志写入到不同的topic         if (obj.getString("start") != null && obj.getString("start").length() >0){             kafka.send("gmall_startup_topic",logString);         }else {             kafka.send("gmall_event_topic",logString);         }      }      /**      * 把日志写入到磁盘      * @param logString      */     private void saveToDisk(String logString) {         log.info(logString);     } } 

2.4 将日志服务器打包上传到linux的hadoop162上

2.5 部署Nginx

  • 步骤1: 使用 yum 安装依赖包

sudo yum -y install   openssl openssl-devel pcre pcre-devel   zlib zlib-devel gcc gcc-c++ 
  • 步骤2:下载 Nginx

如果已经有下载好的安装包, 此步骤可以省略

/opt/software » wget http://nginx.org/download/nginx-1.12.2.tar.gz 
  • 步骤3: 解压到当前目录

tar -zxvf nginx-1.12.2.tar.gz 
  • 步骤4:编译和安装

进入解压后的根目录

./configure  --prefix=/opt/module/nginx  make ****&&**** make install 
  • 步骤5: 启动 Nginx

    进入安装目录:

    cd /opt/module/nginx 

    u 启动 nginx:

    sbin/nginx 

    u 关闭 nginx:

    sbin/nginx -s stop 

    u 重新加载:

    sbin/nginx -s reload 

注意:

  1. Nginx 默认使用的是 80 端口, 由于非root用户不能使用 1024 以内的端口, 但是在生产环境下不建议使用root用户启动nginx, 主要从安全方面考虑

  2. 如果使用普通用户启动 Nginx, 需要先执行下面的命令来突破上面的限制:

sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx 
  • 步骤6: 查看是否启动成功

实时数仓——数据采集

通过网页访问: http://hadoop102实时数仓——数据采集

2.6 配置负载均衡

模拟数据以后应该发给nginx, 然后nginx再转发给我们的日志服务器.

日志服务器我们会分别配置在hadoop102,hadoop103,hadoop104三台设备上.

2.6.1 打开nginx配置文件

cd /opt/module/nginx/conf  vim nginx.conf 

2.6.2 修改如下配置

http {    	# 启动省略     upstream logcluster{         server hadoop162:8081 weight=1;         server hadoop163:8081 weight=1;         server hadoop164:8081 weight=1;     }     server {         listen       80;         server_name  localhost;          #charset koi8-r;          #access_log  logs/host.access.log  main;          location / {             #root   html;             #index  index.html index.htm;             # 代理的服务器集群  命名随意, 但是不能出现下划线             proxy_pass http://logcluster;             proxy_connect_timeout 10;         } 		 		# 其他省略	 } 

2.7 分发日志服务器

--日志服务器每个节点配置一个.  	(nginx只需要配置到hadoop102单台设备就行了) 

三、启动将日志生成到kafka

3.1 启动zookeeper,kafka

zk start kafka.sh start 

3.2 启动hadoop162上的kafka开始消费数据

[[email protected] ~]$ cd /opt/module/kafka_2.11 	bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic gmall_event_topic  [[email protected] ~]$ /opt/module/kafka_2.11 	bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic gmall_startup_topic 	 #我们这有脚本:  consume 主题名 [[email protected] ~]$ consume gmall_event_topic [[email protected] ~]$ consume gmall_startup_topic 

3.3 启动三台机器上的日志服务器

[[email protected] ~]$ cd /opt/module/gmall0621 java -jar gmall-logger-0.0.1-SNAPSHOT.jar --sever-port 8081 					 [[email protected] ~]$ cd /opt/module/gmall0621 java -jar gmall-logger-0.0.1-SNAPSHOT.jar --sever-port 8081  [[email protected] ~]$ cd /opt/module/gmall0621 java -jar gmall-logger-0.0.1-SNAPSHOT.jar --sever-port 8081 

3.4 启动Nginx

[[email protected] ~]$ cd /opt/module/nginx [[email protected] ~]$ sbin/nginx 

3.5 启动模拟日志数据

cd /opt/software/mock/mock_log java -jar gmall2020-mock-log-2020-05-10.jar 

四、总结

实时数仓——数据采集

1.启动模拟数据后,会通过application.properties文件找到要把日志发到哪个服务器(这个配置在application.properties文件里,Nginx默认80端口) 2.我们这先发到Nginx,通过Nginx决定将数据发到哪个服务器去消费数据 3.启动模拟日志前,需要先启动Nigix 

五、脚本

#!/bin/bash #在hadoop162一个nginx,在162-164分别起一个日志服务器 nginx_home=/opt/module/nginx log_home=/opt/module/gmall0621 case $1 in "start")     # 先启动nginx     if [ -z "$(ps -ef | awk '/nginx/ && !/awk/ {print $0}')" ]; then         echo "在 hadoop162 开始启动nginx"         $nginx_home/sbin/nginx     else         echo "在 hadoop162 nginx已经启动,不用重复启动"     fi     # 分别启动日志服务器     for host in hadoop162 hadoop163 hadoop164 ; do         echo "在${host}上启动日志服务器"         ssh $host "nohup java -jar $log_home/gmall-logger-0.0.1-SNAPSHOT.jar >/dev/null 2>&1 &"     done ;; "stop")     echo "在 hadoop162 停止nginx"     $nginx_home/sbin/nginx -s stop     # 分别停止日志服务器     for host in hadoop162 hadoop163 hadoop164 ; do         echo "在${host}上停止日志服务器"         ssh $host "jps | awk '/gmall-logger-0.0.1-SNAPSHOT.jar/ {print $1}' | xargs kill -9"     done  ;;  *)     echo "你启动的姿势不对, 换个姿势再来"     echo "  log.sh start 启动日志采集"     echo "  log.sh stop  启动日志采集" ;; esac 

本文转自互联网,侵权联系删除实时数仓——数据采集

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 实时数仓——数据采集
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们