在开发spark应用过程中需要往hive表中造测试数据,同时造多列数据,部分列之间存在逻辑计算关系,一般使用dataframe的函数.withColumn(“col_name”,conditions),此时conditions可以直接是类似于 col(“column_a”) * col(“column_b”) 这样的计算条件,也可以是udf函数。
例如: 如果我们需要使用table_1关联table_2,得到col_a,且需要新增三列col_b,col_c和col_d,计算条件如下:


col_b = (randomDouble() * 1.5).formatted("%.2f").toDouble,
col_c = col_a * col_b,
col_d = col_c / (col_a - col_b)

  从计算条件得知,先用spark连接hive库,读取table_1和table_2生成对应的dataframe,得到col_a;
  col_b可以由自定义函数实现,且不依赖其他列;
  而col_c和col_d均依赖其他列的数据,这里就要注意,需要在col_b计算完之后,加上cache缓存数据【cache的功能是缓存:针对频繁使用的数据/数据处理,cache将 RDD 元素从磁盘缓存到内存,便于下次读取】否则col_b的计算结果还没有被缓存到内存中,col_c也同时在计算且利用到了col_b的值,导致col_c数据混乱;
  同理,新增col_d时候,也需要将col_c的结果缓存起来。

(开发环境spark+hive+scala)

//自定义udf函数不带参数
val col_udf = udf(() => {
	(randomDouble() * 1.5).formatted("%.2f").toDouble
})

//自定义udf函数带参数
val col_udf_withParams= udf((col_a:Int,col_b:Double,col_c:Double) => {
	col_c / (col_a - col_b)
})

result_df = df_table_1.
join(df_table_2("t2_col_a"),col("t1_col_a")===col("t2_col_a"),"left_outer")
.withColumnRenamed("t1_col_a","col_a") // 将t1_col_a重命名成col_a
//不用cache,因为col_b是独立计算的随机数
.withColumnRenamed("col_b",col_udf()) // col_b是随机数
//注意:在计算完col_b之后这地方必须增加缓存,否则col_c的计算结果并不等于col_a * col_b
.cache() 
.withColumn("col_c",col("col_a") * col("col_b"))
.cache() 
.withColumn("col_d",col_udf_withParams(col("col_a"),col("col_b"),col("col_c")))
.select(
 "col_a",
 "col_b",
 "col_c",
 "col_d"
).show(10)
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐