资源简介
记得自己要引入环境
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含:
”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。
(2)对读入都日志信息流进行指定筛选出日志级别为error或warn的,并输出到外部MySQL中。
需要用到的函数
(1)输入采用textFileStream()算子
(2)输出采用foreachRDD()算子
(3)将RDD转为DataFrame
(4)DataFrame注册为临时表,使用SQL过滤
(5)将过滤后的数据保存到MySQL
代码片段和文件信息
from pyspark.shell import sc
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
spark = SparkSession.builder.appName(“Streaming“).getOrCreate()
sc=spark.sparkContext
#两个参数:1、sc参数 2、采样时间间隔(秒)
ssc =StreamingContext(sc1)
#在ubuntu环境下数据源路劲
ds1 =ssc.textFileStream(“/home/zhuang/138/input/test“)
#把所有数据划分为[[][]]格式
ds3 = ds1.map(lambda line:line.split(“\t“))
def func(rdd):
if not rdd.isEmpty():
#记得转码很重要
url = “jdbc:mysql://ip地址:3306/pyspark?user=root&password=zhuang&characterEncoding=UTF-8“
#构建表结构
schema = StructType([StructField(“日志级别“ StringType() True) StructField(“函数名“ StringType() True)
StructField(“日志内容“ StringType() True)])
#对[[][][]]数据转换成[[[][][]][][][]]因为todf数据是数据格式传值
rdd.map(lambda x:tuple(x)).toDF(schema).registerTempTable(“test_person1“)
df1 = spark.sql(“select * from test_person1 where ‘日志级别‘!=‘[info]‘“)
# df2 = spark
df1.show()
#写入mysql
df1.write.jdbc(mode=“overwrite“url=urltable=“test_person1“ properties={“driver“:‘com.mysql.jdbc.Driver‘})
df1.show()
ds3.pprint()
ds3.foreachRDD(func)
# print(ds4.foreachRDD(func))
ssc.start();ssc.awaitTermination()
属性 大小 日期 时间 名称
----------- --------- ---------- ----- ----
文件 501 2019-05-04 11:03 20180103.log
文件 501 2019-05-04 11:03 20180104.log
文件 1007502 2019-03-12 14:38 mysql-connector-java-5.1.47.jar
文件 1514 2019-05-30 08:58 test02.py
相关资源
- mysql数据库驱动8.0.12版本
- mha4mysql-0.56-0.el6
- mysql_5.6.24_winx64
- MYSQL作业提交作业批改系统.zip
- Maven搭建Spring+Mybatis+MySql
- 成语首尾字用于成语接龙.sql
- MySQL中文手册api帮助文档
- linux_mysql5.1.66x86_64.zip
- 6.SparkSQL下--Spark实战应用.pdf
- oracle 到mysql转换工具
- 数据库原理实验指导书Mysql
- mysql 5.6 绿色精简版 5Mb
- mysql Premium 破解
- 深入浅出MySQL第二版本pdf
- 深入浅出MySQL.pdf
- 深入浅出mysql全文
- 全球国家及地区库,采集自腾讯QQ国内
- mysql操作练习的表数据
- 免费的Navicat11全系列注册机Navicat fo
- Navicat For MySql 8.0.20 简体中文版(含破
- navicat for mysql v 11.1.13破解工具
- mysql数据库5.6连接包
- Navicat for MySQL注册机 绿色版
- 旅游管理系统数据库
- 易语言MySQL注册登录源码
- 传智播客mysql的sql优化
- msvcr120.dll 32位和64位
- mysql5.7驱动.rar
- mysql innodb恢复数据工具.rar
- 易语言Mysql线程池2.0模块源码
评论
共有 条评论