本文共 7742 字,大约阅读时间需要 25 分钟。
接上篇 详见
江湖规矩,先上pom,fastjson需用到,先加上
com.alibaba fastjson 1.2.58
MySql结果导出表结构
lastDF表结构
数据处理部分结构
上回书说到,把DF转化为RDD,然后在进行内连接,今天适用spark的2.0版本DataFrame的新方法之join操作
上代码
HiveDriver部分
import java.sql.DriverManagerimport java.util.Propertiesimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{Row, SaveMode, SparkSession}object HiveDriver { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate() //session.udf.register("aggr",new AggrNameUDF()) session.sql("use day03") val beginTime = "2019-12-20 00.00.00" val endTime = "2019-12-20 12.30.01" val df = session.sql("select city_id,action_time,click_product_id from user_click_action" + " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"'") /* session.sql("select city_id,click_product_id,count(*) click_count from user_click_action" + " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"' group by city_id,click_product_id limit 10").show()*/ val url = "jdbc:mysql://hadoop:3306/day03" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","1234") val mysql = session.read.jdbc(url,"city_info",properties)//----------------------------------华丽的分割线----------------------------------------- //方法一:转成RDD进行join /*val user_click_action = df.rdd.map(x=>{ (x.getString(0),x) }) val city_info = mysql.rdd.map(x=>{ (x.getString(0),x) }) val TopN = city_info.join(user_click_action) //相当于表中的多行 val topN = TopN.map(x=>{ val cityID = x._1 val cityName = x._2._1.getString(1) val area = x._2._1.getString(2) val productId = x._2._2.getString(2) Row(cityID,cityName,area,productId) }) val schema = StructType(List( StructField("city_id",StringType), StructField("city_name",StringType), StructField("area",StringType), StructField("product_id",StringType) )) val joinedDF = session.createDataFrame(topN,schema) joinedDF.createTempView("TopN") val end = session.sql("select * from TopN limit 10") end.show()*///----------------------------------华丽的分割线----------------------------------------- //方法二,saprk2.0以后 df可使用join操作,可不转化为RDD,直接进行join操作 val joinedDF = mysql.join(df,"city_id") joinedDF.createOrReplaceTempView("joined_table") /* val TopN = session.sql("select * from joined_table limit 10") TopN.show()*/ //自定义聚合函数 session.udf.register("aggr",new AggrCityUDF) //统计每个地区每件商品的 top3 点击次数 并 列出城市 //createOrReplaceTempView源码释意,使用给定的名称创建本地临时视图 //此临时视图的生存期绑定到用于创建此数据集的[[SparkSession]] //根据地区和商品id分组,自定义聚合函数用来求点击了商品的城市id,count函数用来求点击次数 val sql = "select area,count(*) product_count,click_product_id product_id,aggr(city_name,city_id) city from joined_table" + " group by area,click_product_id" var groupDF = session.sql(sql) groupDF.createOrReplaceTempView("aggred_table") //对上一步的结果进行倒序排序,以及求每个地区前N个值 val sql2 = "select * from (select *,row_number() over(partition by area order by product_count desc) rowNumber" + " from aggred_table) s where rowNumber<=3 " val sortedDF = session.sql(sql2) //sortedDF.show(10) //自定义聚合函数用来处理行中json类型的字段 //最后一个参数是返回值类型 session.udf.register("get_json",new JsonProductValue,StringType) //链接上一篇Hive中创建的product_info表 val sql3 = "select * from product_info" val productDF = session.sql(sql3) //productDF.show(10) //依据product_id,将两个dataframe进行合并,拿到product_id对应的商品名 val productTopThreeDF = productDF.join(sortedDF,"product_id") productTopThreeDF.createOrReplaceTempView("productTopThreeDF") //productTopThreeDF.show(20) //获取extend_info,json字段中的key'product_status',获取value,并根据value的值 //0为自营,1为第三方 如果是多个值可使用case when then ...else end来处理 val sql4 = "select area,city,product_id,product_name,product_count," + " if(get_json(extend_info,'product_status')=0,'自营','第三方' )status from productTopThreeDF" //case ss when 1 then '' 。。。。 else end //终于到了最后一步,呼~创建本地临时视图,并把数据导入mysql val lastDF = session.sql(sql4) lastDF.createOrReplaceTempView("lastDF") //lastDF.show(100)//导入mysql依然提供两种方式,可根据业务逻辑的需求进行自行判断取用//----------------------------------华丽的分割线----------------------------------------- //方法一,转换为rdd并并根据分区遍历foreachPartition,在内部利用迭代foreach二次遍历,最后 //通过mysql批插入的方式导入数据库 lastDF.rdd.foreachPartition(itor=>{ val url = "jdbc:mysql://hadoop:3306/day03" val con = DriverManager.getConnection(url,"root","1234") val sql = "insert into lastDF values(?,?,?,?,?,?)" val driver = con.prepareStatement(sql) itor.foreach(x=>{ driver.setString(1,x.getString(0)) driver.setString(2,x.getString(1)) driver.setString(3,x.getString(2)) driver.setString(4,x.getString(3)) //因为定义最后输出结果表product_count定义是varchar类型,在这里需要转一下类型 driver.setString(5,String.valueOf(x.getLong(4))) driver.setString(6,x.getString(5)) driver.addBatch() //driver.executeUpdate() }) driver.executeBatch() driver.close() con.close() }) //----------------------------------华丽的分割线----------------------------------------- //方法二,没什么好说的官方首推的方式,可点击jdbc方法查看内部源代码实现,它的实现和上面类型,也//通过高效的foreachPartition方式,性能方面并不会第一种方式差//lastDF.write.mode(SaveMode.Append).jdbc(url,"lastDF",properties) //df.show() session.stop() }}
上面说完了HiveDriver,说一下聚合函数AggrCityUDF
在上一篇已经说过,大部分类型,需要注意的是在分区新传递数据时,使用了城市id+‘:’+城市name的方式,其余一致
import org.apache.spark.sql.Rowimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}class AggrCityUDF extends UserDefinedAggregateFunction{ //输入数据的结构类型 override def inputSchema: StructType = {StructType(List(StructField("city_name",StringType),StructField("city_id",StringType)))} //缓冲区数据的结构类型 override def bufferSchema: StructType = {StructType(List(StructField("city_name",StringType),StructField("city_id",StringType)))} //返回值类型 override def dataType: DataType = StringType override def deterministic: Boolean = true //初始化操作,初始值赋值为空 override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,"") //在work中每一个分区进行操作 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { //获取原先的值 var bfValue = buffer.getString(0) //新传递的数据 var nowValue = input.getString(1)+":"+input.getString(0) //判断当前分区中是否包含了该城市名称 if(!bfValue.contains(nowValue)){ if(bfValue==""){ bfValue = nowValue }else{ bfValue += ","+nowValue } } //把合并的数据再放到缓冲区 buffer.update(0,bfValue) } //合并所有rdd为一个数据 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { var bfValue = buffer1.getString(0) var nowValue = buffer2.getString(0) if(!bfValue.contains(nowValue)){ if(bfValue==""){ bfValue = nowValue }else{ bfValue += ","+nowValue } } buffer1.update(0,bfValue) } //得到缓冲区存放数据 override def evaluate(buffer: Row): Any = buffer.getString(0)}
最后JsonProductValue
继承UDF2,实现call方法,就是利用json的key,拿到返回的value而已
import com.alibaba.fastjson.JSONimport org.apache.spark.sql.api.java.UDF2//三个参数//1、json本身 2、json的key 3、返回值类型class JsonProductValue extends UDF2[String,String,String]{ override def call(productInfo: String, key: String): String = { val josn = JSON.parseObject(productInfo) josn.getString(key) }}
文章的末尾给今日嘉宾,蕾哥,唱一首 歌~
(一首什么 歌?)
一首 我滴小可爱~
(告辞)
转载地址:http://paazi.baihongyu.com/