hive中创建数据库law,并在law表中导入数据。使用spark—shell(效率高),进入sparksql与shell的交互式模式下,创建hiveContext,使用hivecontext读取law表中(私我)数据进行分析。

1.创建law表并导入数据(数据是上传到hdfs上,必须配好虚拟机

create database law;
use law;
CREATE TABLE  law ( 
ip bigint, 
area int,
ie_proxy string, 
ie_type string ,
userid string,
clientid string,
time_stamp bigint,
time_format string,
pagepath string,
ymd int,
visiturl string,
page_type string,
host string,
page_title string,
page_title_type int,
page_title_name string,
title_keyword string,
in_port string,
in_url string,
search_keyword string,
source string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
load data inpath '/user/root/law_utf8.csv' overwrite into table law;

2.针对原始数据中用户点击量进行统计,点击量是“网页类型”中的前三位数字。统计类型为网页类型,记录数及其所占总记录百分比,统计结果以json文件格式保存到hdfs上。

import org.apache.spark.sql.SaveMode
val pageType=hiveContext.sql("select substring(page_type,1,3) as page_type,count(*) as count_num,round((count(*)/837450.0)*100,4) as weights from lawgroup by substring(page_type,1,3)")
pageType.orderBy(-pageType("count_num")).show()
pageType.repartition(1).save("/user/root/sparkSql/pageType.json","json",SaveMode.Overwrite)

3(要虚拟机详细代码私我),以上就讲到这里。以下是在IDEA,本地运行全部代码。



import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructType}

import java.util.Properties

//import org.apache.calcite.avatica.ColumnMetaData.StructType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StructField}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object sale {
  def main(args: Array[String]): Unit = {
    //1.创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("SparkSqlToMysql_2")
      .master("local[2]")
      .getOrCreate()
    //2.读取数据
    val data: RDD[String] = spark.sparkContext.textFile("D:\\word\\law2_utf8.csv")
    //3.切分每一行,
    val arrRDD: RDD[Array[String]] = data.map(x=>x.split(","))
    //5.加载数据到Row对象中
    val personRDD: RDD[Row] = arrRDD.map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12),x(13),x(14),x(15),x(16)))
    //6.创建Schema:行、列
    val schema:StructType= StructType(Seq(
      StructField("ip", StringType, false),
      StructField("area", StringType, false),
      StructField("ie_proxy", StringType, false),
      StructField("ie_type", StringType, false),
      StructField("userid", StringType, false),
      StructField("clientid", StringType, false),

      StructField("time_stamp", StringType, false),
      StructField("time_format", StringType, false),
      StructField("pagepath", StringType, false),

      StructField("ymd", StringType, false),
      StructField("visiturl", StringType, false),
      StructField("page_type", StringType, false),

      StructField("host", StringType, false),
      StructField("page_title", StringType, false),
      StructField("page_title_type", StringType, false),
      StructField("page_title_name", StringType, false),
      StructField("title_keyword", StringType, false)
//                  StructField("in_port", StringType, true),
//                  StructField("in_url", StringType, true),
//                  StructField("search_keyword", StringType, true),
//                  StructField("source", StringType, true)
    ))
    //7.利用personRDD与Schema创建DataFrame
    val personDF: DataFrame = spark.createDataFrame(personRDD,schema)

    //6.将DataFrame注册成表
    personDF.createOrReplaceTempView("law")
    //7.操作html_page表,进行网页类型统计
    // spark.sql("select substring(page_type,1,3) as page_type,count(*) as count_num,round((count(*)/514.0)*100,4) as weights from law group by substring(page_type,1,3)").show()

    //(2)网页类别统计
    val pageLevel=spark.sql("select substring(page_type,1,7) as page_type,count(*) as count_num from law where visiturl like '%faguizt%' and substring(page_type,1,7) like '%199%' group by page_type")
    pageLevel.show()
    //(3)咨询内部统计
    val consultCount=spark.sql("select substring(page_type,1,6) as page_type,count(*) as count_num,round((count(*)/411665.0)*100,4) as weights from law where substring(page_type,1,3)=101 group by substring(page_type,1,6)")
    consultCount.orderBy(-consultCount("count_num")).show()
    //(4)网页中带有“?”的记录统计
    spark.sql("select count(*) as num from law where visiturl like '%?%'").show()
    val pageWith=spark.sql("select substring(page_type,1,7) as page_type,count(*) as count_num,round((count(*)*100)/65477.0,4) as weights from law where visiturl like '%?%' group by substring(page_type,1,7)")
    pageWith.orderBy(-pageWith("weights")).show()

    //        //5)分析其他类型网页的内部规律
    val otherPage=spark.sql("select count(*) as count_num,round((count(*)/64691.0)*100,4) as weights,page_title from law where visiturl like '%?%' and substring(page_type,1,7)=1999001 group by page_title")
    otherPage.orderBy(-otherPage("count_num")).limit(5).show()
    //        //(6)统计“瞎逛用户”点击的网页类型
    //        val streel=spark.sql("select count(*) as count_num,substring(page_ type,1,3) as page_type from law where visiturl not like '%.html' group by substring(page_type,1,3)")
    //        streel.orderBy(-streel("count_num")).limit(6).show()
    //        //3.点击次数分析
    //        //(1)用户点击次数统计
    //        spark.sql("select count(distinct userid) from law").show()
    //        val clickCount=spark.sql("select click_num,count(click_num) as count,round(count(click_num)*100/350090.0,2),round((count(click_num)*click_num)*100/837450.0,2) from (select count(userid) as click_num from lawgroup by userid)tmp_table group by click_num order by count desc")
    //        clickCount.limit(7).show()
    //        //(2)浏览一次用户统计分析
    //        val onceScan=spark.sql("select page_type,count(page_type) as count,round((count(page_type)*100)/229365.0,4) from (select substring(a.page_ type,1,7) as page_type from law a,(select userid from lawgroup by userid having(count(userid)=1))b where a.userid=b.userid)c group by page_type order by count desc")
    //        onceScan.limit(5).show()
    //        //(3)统计点击一次用户访问URL排名
    //        val urlRank=spark.sql("select a.visiturl,count(*) as count from law a,(select userid from lawgroup by userid having(count(userid)=1))b where a.userid=b.userid group by a.visiturl")
    //        urlRank.orderBy(-urlRank("count")).limit(7).show(false)
    //        //(4)原始数据中包含以.html扩展名的网页点击率统计
    //        val clickHtml=spark.sql("select a.visiturl,count(*) as count from law a where a.visiturllike '%.html%' group by a.visiturl")
    //        clickHtml.orderBy(-clickHtml("count")).limit(10).show(false)
    //        //(5)翻页网页统计
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/gongsi/slbgzcdj/201312312876742.html'").show()
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/gongsi/slbgzcdj/201312312876742_2.html'").show()
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128.html'").show()
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_2.html'").show()
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_3.html'").show()
    //        spark.sql("select count(*)  from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_4.html'").show()




    //写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
    //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.154.141:3306/test09?useUnicode=true&characterEncoding=utf8","html_page",prop)
    spark.stop()
  }
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐