博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
根据用户指定的日期范围,统计各个区域下的最热门的top3商品,最后将结果写入MySQL表中(二)
阅读量:3960 次
发布时间:2019-05-24

本文共 7742 字,大约阅读时间需要 25 分钟。

接上篇  详见 

江湖规矩,先上pom,fastjson需用到,先加上

com.alibaba
fastjson
1.2.58

MySql结果导出表结构

lastDF表结构

数据处理部分结构

上回书说到,把DF转化为RDD,然后在进行内连接,今天适用spark的2.0版本DataFrame的新方法之join操作

上代码

HiveDriver部分

import java.sql.DriverManagerimport java.util.Propertiesimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{Row, SaveMode, SparkSession}object HiveDriver {  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.WARN)    val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()    //session.udf.register("aggr",new AggrNameUDF())    session.sql("use day03")    val beginTime = "2019-12-20 00.00.00"    val endTime = "2019-12-20 12.30.01"    val df = session.sql("select city_id,action_time,click_product_id from user_click_action" +      " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"'")   /* session.sql("select city_id,click_product_id,count(*) click_count from user_click_action" +      " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"' group by city_id,click_product_id limit 10").show()*/    val url = "jdbc:mysql://hadoop:3306/day03"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","1234")    val mysql = session.read.jdbc(url,"city_info",properties)//----------------------------------华丽的分割线-----------------------------------------  //方法一:转成RDD进行join    /*val user_click_action = df.rdd.map(x=>{      (x.getString(0),x)    })    val city_info = mysql.rdd.map(x=>{      (x.getString(0),x)    })    val TopN = city_info.join(user_click_action)    //相当于表中的多行    val topN = TopN.map(x=>{      val cityID = x._1      val cityName = x._2._1.getString(1)      val area = x._2._1.getString(2)      val productId = x._2._2.getString(2)      Row(cityID,cityName,area,productId)    })    val schema = StructType(List(      StructField("city_id",StringType),      StructField("city_name",StringType),      StructField("area",StringType),      StructField("product_id",StringType)    ))    val joinedDF = session.createDataFrame(topN,schema)    joinedDF.createTempView("TopN")    val end =  session.sql("select * from TopN limit 10")    end.show()*///----------------------------------华丽的分割线-----------------------------------------  //方法二,saprk2.0以后 df可使用join操作,可不转化为RDD,直接进行join操作    val joinedDF = mysql.join(df,"city_id")    joinedDF.createOrReplaceTempView("joined_table")   /* val TopN = session.sql("select * from joined_table limit 10")    TopN.show()*/    //自定义聚合函数    session.udf.register("aggr",new AggrCityUDF)       //统计每个地区每件商品的 top3  点击次数 并 列出城市    //createOrReplaceTempView源码释意,使用给定的名称创建本地临时视图    //此临时视图的生存期绑定到用于创建此数据集的[[SparkSession]]        //根据地区和商品id分组,自定义聚合函数用来求点击了商品的城市id,count函数用来求点击次数    val sql = "select area,count(*) product_count,click_product_id product_id,aggr(city_name,city_id) city from joined_table" +      " group by area,click_product_id"        var groupDF = session.sql(sql)    groupDF.createOrReplaceTempView("aggred_table")    //对上一步的结果进行倒序排序,以及求每个地区前N个值    val sql2 = "select * from (select *,row_number() over(partition by area order by product_count desc) rowNumber" +      " from aggred_table) s where rowNumber<=3 "    val sortedDF = session.sql(sql2)    //sortedDF.show(10)        //自定义聚合函数用来处理行中json类型的字段    //最后一个参数是返回值类型    session.udf.register("get_json",new JsonProductValue,StringType)    //链接上一篇Hive中创建的product_info表    val sql3 = "select * from product_info"    val productDF = session.sql(sql3)    //productDF.show(10)        //依据product_id,将两个dataframe进行合并,拿到product_id对应的商品名    val productTopThreeDF = productDF.join(sortedDF,"product_id")    productTopThreeDF.createOrReplaceTempView("productTopThreeDF")    //productTopThreeDF.show(20)     //获取extend_info,json字段中的key'product_status',获取value,并根据value的值    //0为自营,1为第三方  如果是多个值可使用case when then ...else end来处理      val sql4 = "select area,city,product_id,product_name,product_count," +      " if(get_json(extend_info,'product_status')=0,'自营','第三方' )status from productTopThreeDF"    //case ss when 1 then ''  。。。。 else end        //终于到了最后一步,呼~创建本地临时视图,并把数据导入mysql    val lastDF = session.sql(sql4)    lastDF.createOrReplaceTempView("lastDF")    //lastDF.show(100)//导入mysql依然提供两种方式,可根据业务逻辑的需求进行自行判断取用//----------------------------------华丽的分割线-----------------------------------------    //方法一,转换为rdd并并根据分区遍历foreachPartition,在内部利用迭代foreach二次遍历,最后    //通过mysql批插入的方式导入数据库    lastDF.rdd.foreachPartition(itor=>{      val url = "jdbc:mysql://hadoop:3306/day03"      val con = DriverManager.getConnection(url,"root","1234")      val sql = "insert into lastDF values(?,?,?,?,?,?)"      val driver = con.prepareStatement(sql)      itor.foreach(x=>{        driver.setString(1,x.getString(0))        driver.setString(2,x.getString(1))        driver.setString(3,x.getString(2))        driver.setString(4,x.getString(3))        //因为定义最后输出结果表product_count定义是varchar类型,在这里需要转一下类型        driver.setString(5,String.valueOf(x.getLong(4)))        driver.setString(6,x.getString(5))        driver.addBatch()        //driver.executeUpdate()      })      driver.executeBatch()      driver.close()      con.close()    })    //----------------------------------华丽的分割线-----------------------------------------    //方法二,没什么好说的官方首推的方式,可点击jdbc方法查看内部源代码实现,它的实现和上面类型,也//通过高效的foreachPartition方式,性能方面并不会第一种方式差//lastDF.write.mode(SaveMode.Append).jdbc(url,"lastDF",properties)    //df.show()    session.stop()  }}

上面说完了HiveDriver,说一下聚合函数AggrCityUDF

在上一篇已经说过,大部分类型,需要注意的是在分区新传递数据时,使用了城市id+‘:’+城市name的方式,其余一致

import org.apache.spark.sql.Rowimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}class AggrCityUDF extends UserDefinedAggregateFunction{  //输入数据的结构类型  override def inputSchema: StructType = {StructType(List(StructField("city_name",StringType),StructField("city_id",StringType)))}  //缓冲区数据的结构类型  override def bufferSchema: StructType = {StructType(List(StructField("city_name",StringType),StructField("city_id",StringType)))}  //返回值类型  override def dataType: DataType = StringType  override def deterministic: Boolean = true  //初始化操作,初始值赋值为空  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,"")  //在work中每一个分区进行操作  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {    //获取原先的值    var bfValue = buffer.getString(0)    //新传递的数据    var nowValue = input.getString(1)+":"+input.getString(0)    //判断当前分区中是否包含了该城市名称    if(!bfValue.contains(nowValue)){      if(bfValue==""){        bfValue = nowValue      }else{        bfValue += ","+nowValue      }    }    //把合并的数据再放到缓冲区    buffer.update(0,bfValue)  }  //合并所有rdd为一个数据  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {    var bfValue = buffer1.getString(0)    var nowValue = buffer2.getString(0)    if(!bfValue.contains(nowValue)){      if(bfValue==""){        bfValue = nowValue      }else{        bfValue += ","+nowValue      }    }    buffer1.update(0,bfValue)  }  //得到缓冲区存放数据  override def evaluate(buffer: Row): Any = buffer.getString(0)}

 最后JsonProductValue

继承UDF2,实现call方法,就是利用json的key,拿到返回的value而已

import com.alibaba.fastjson.JSONimport org.apache.spark.sql.api.java.UDF2//三个参数//1、json本身 2、json的key 3、返回值类型class JsonProductValue extends UDF2[String,String,String]{  override def call(productInfo: String, key: String): String = {    val josn = JSON.parseObject(productInfo)    josn.getString(key)  }}

文章的末尾给今日嘉宾,蕾哥,唱一首     歌~

(一首什么      歌?)

一首    我滴小可爱~

(告辞)

转载地址:http://paazi.baihongyu.com/

你可能感兴趣的文章
杭电ACM——2069,Coin Change(DP)
查看>>
杭电ACM——2074,叠筐
查看>>
北大ACM——3616,Milking Time(DP)
查看>>
杭电ACM——2076,夹角有多大
查看>>
牛客练习赛43——B Tachibana Kanade Loves Probability(暴力,思维)
查看>>
牛客第十七届上海大学程序设计春季联赛——E CSL 的魔法(贪心)
查看>>
杭电ACM——1028,Ignatius and the Princess III(母函数)
查看>>
杭电ACM——1171,Big Event in HDU(母函数)
查看>>
杭电ACM——6491,时间间隔(思维)
查看>>
杭电AC——1085,Holding Bin-Laden Captive!(母函数)
查看>>
杭电ACM——2110,Crisis of HDU(母函数)
查看>>
杭电AM——2152,Fruit(母函数)
查看>>
杭电ACM——2566,统计硬币(DP)
查看>>
堆栈(数据结构)
查看>>
队列(数据结构)
查看>>
杭电ACM——1251,统计难题(Trie树)
查看>>
牛客网哈尔滨工程大学第十四届程序设计竞赛(同步赛)—— 小蚂蚁过马路(思维)
查看>>
牛客网哈尔滨工程大学第十四届程序设计竞赛(同步赛)—— 苹果手链(水题)
查看>>
杭电ACM——6518,Clumsy Keke(暴力+思维)
查看>>
杭电ACM——6512,Triangle(暴力 / 思维)
查看>>