资源简介

(1)创建RDD (2)将RDD转为DataFrame (3)调用registerTempTable,注册为表,表名为:tb_book (4)使用使用sql语句查询前15条 (5)模糊查询书名包含“微积分”的书 (6)输出图书的前10行的name和price字段信息 (7)统计书名包含“微积分”的书的数量 (8)查询评分大于9的图书,,且只展示前10条 (9)计算所有书名包含“微积分”的评分平均值 (10)把书目按照评分从高到低进行排列,且只展示前15条 (11)把图书按照出版社进行分组,统计出不同出版社图书的总数 (12)将书名包含“微积分”的书记录保存到本地或HDFS上,且保存的格式为csv,文件名为:学号.csv (13)然后再从该csv文件加载,创建DataFrame,并查询和显示

资源截图

代码片段和文件信息

from pyspark.shell import sc
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark =SparkSession.builder.master(“local“).appName(“Word Count“).config(“spark.some.config.option“ “some-value“).getOrCreate()

rdd = sc.textFile(“xxxxx.txt“)
header = rdd.first().split(““)
header1 = rdd.first()
print(header)
schema = StructType([StructField(header[0]StringType()True)StructField(header[1]StringType()True)StructField(header[2]StringType()True)StructField(header[3]StringType()True)StructField(header[4]StringType()True)StructField(header[5]StringType()True)])

def filter_line(line):
    if line == header1:
        return False
    else:
        return True

rdd1 = rdd.filter(lambda line:line!=header1).map(lambda line:line.split(““)).map(lambda x:tuple(x))

df = rdd1.toDF(schema)

# # df.show()
# “““
# 建表
# “““
df.registerTempTable(“tb_book“)


# spark.sql(“select * from tb_book“).show(15)
# spark.sql(“select * from tb_book where ‘书名‘ like ‘%%微积分%%‘“).show()
# spark.sql(“select ‘书名‘‘价格‘ from tb_book“).show(10)
# number =spark.sql(“select * from tb_book where ‘书名‘ like ‘%%微积分%%‘“).count()
# print(number)
# spark.sql(“select * from tb_book where ‘评分‘>‘9‘“).show(10)
# spark.sql(“select avg (‘评分‘) from tb_book where ‘书名‘ like ‘%%微积分%%‘ “).show()
# spark.sql(“select * from tb_book order by ‘评分‘ desc “).show(15)
# group =spark.sql(“select ‘出版社‘count(‘出版社‘) from tb_book GROUP by ‘出版社‘“).collect()
# print(group)

# ddf =spark.sql(“select * from tb_book where ‘书名‘ like ‘%%微积分%%‘“)
# ddf.write.parquet(“/home/zhuang/138/tb_book“)
# ddf.write.format(‘csv‘).save(“/home/zhuang/138/16034460138.csv“)
# userDF=spark.read.format(“csv“).load(‘/home/zhuang/138/16034460138.csv/part-00000-e2c9db96-961e-45d7-8221-c2ff5e90d174-c000.csv‘)
# userDF.printSchema()
# userDF.show()

# rdd_1 =sc.textFile(“/home/zhuang/138/16034460138.csv/part-00000-e2c9db96-961e-45d7-8221-c2ff5e90d174-c000.csv“)
# rdd_2 = rdd_1.map(lambda line:line.split(““)).map(lambda x:tuple(x))
# dfff = rdd_2.toDF(schema)
# dfff.select(“书名“).show()

 属性            大小     日期    时间   名称
----------- ---------  ---------- -----  ----
     文件     5526130  2019-05-15 02:46  book.txt
     文件        2191  2019-05-16 00:54  test2.py

评论

共有 条评论