通过利用pyspark.sql.Window实现collect_list的排序

from pyspark.sql.window import Window

window_ = Window.partitionBy("userid").orderBy("stay_start_time")
df42= df3.join(df41,'userid','inner').withColumn('lng_lat',concat_ws(',',col('stay_lng'),col('stay_lat')))\
.withColumn("lng_lats",F.collect_list("lng_lat").over(window_))\
.groupby('userid','lng','lat').agg(F.max("lng_lats").alias("lng_lats"))\
.withColumn("lng_lats",concat_ws(';',col('lng_lats')))\
.cache()
df42.show(truncate=False)

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐