本文作者:云初冀北

spark dataframe全局排序id与分组后保留最大值行

spark dataframe全局排序id与分组后保留最大值行摘要: 正文作为一个算法工程师,日常学习和工作中,不光要 训练模型关注效果 ,更多的 时间 是在 准备样本数据与分析数据 等,而这些过程 都与 大数据 spark和hadoop生态 的若干...

?=正文

作为一个算法工程师,日常学习和工作中,不光要 训练模型关注效果 ,更多的 时间 是在 准备样本数据分析数据 等,而这些过程 都与 大数据 Spark和hadooP生态 的若干工具息息相关。

今天我们就不在更新 机器学习算法模型 相关的内容,分享两个 spark函数 吧,以前也在某种场景中使用过但没有保存收藏,哎!! 事前不搜藏,临时抱佛脚 的感觉 真是 痛苦,太耽误干活了

so,把这 两个函数 记在这里 以备不时 之需~

(1) 得到 spark DataFrame 全局排序ID

这个函数的 应用场景 就是:根据某一数值对 spark 的 dataframe排序, 得到全局多分区排序的全局有序ID,新增一列保存这个rank id ,并且保留别的列的数据无变化

有用户会说,这不是很容易吗 ,直接用 orderBy 不就可以了吗,但是难点是:orderBy完记录下全局ID 并且 保持原来全部列的DF数据

多说无益,遇到这个场景 直接Copy 用起来 就知道 有多爽 了,同问题 我们可以 用下面 这个函数 解决 ~

scala 写的 spark 版本代码

def dfZIPWithIndex(   df: DataFrame,   offset: int = 1,   colName: string ="rank_id",   inFront: Boolean = true ) : DataFrame = {   df.sqlContext.createDataFrame( df.rdd.zipWithIndex.Map(ln =>   Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq())   ++ ln._1.toseq ++ (if (inFront) Seq() else Seq(ln._2 + offset))   ) ), Structtype(   (if (inFront) Array(structField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++   (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) )   ) } 

函数调用我们可以用这行代码调用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接复制过去就可以~

Python写的 pyspark 版本代码:

from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rank_id"): new_schema = StructType( [StructField(colName,LongType(),True)]# new added field in front + df.schema.fields# previous schema ) zipped_rdd = df.rdd.zipWithIndex() new_rdd = zipped_rdd.map(Lambda (row,rowId): ([rowId +offset] + list(row))) return spark.createDataFrame(new_rdd, new_schema) 

调用 同理 , 这里我就不在进行赘述了。

(2)分组后保留最大值

这个函数的 应用场景 就是: 当我们使用 spark 或则 sparkSQL 查找某个 dataframe 数据的时候,在某一天里,任意一个用户可能有多条记录,我们需要 对每一个用户,保留dataframe 中 某列值最大 的那行数据

其中的 关键点 在于:一次性求出对每个用户分组后,求得每个用户的多行记录中,某个值最大的行进行数据保留

当然,经过 简单修改代码,不一定是最大,最小也是可以的,平均都ok

scala 写的 spark 版本代码:

// 得到一天内一个用户多个记录里面时间最大的那行用户的记录 import org.apache.spark.sql.expressiONs.Window import org.apache.spark.sql.functions val w = Window.partitionBy("user_id") val result_df = raw_df .withColumn("max_time",functions.max("time").over(w)) .where($"time" === $"max_time") .drop($"max_time") 

python写的 pyspark 版本代码:

# pyspark dataframe 某列值最大的元素所在的那一行  # GroupBy 列并过滤 Pyspark 中某列值最大的行  # 创建一个Window 以按A列进行分区,并使用它来计算每个组的最大值。然后过滤出行,使 B 列中的值等于最大值  from pyspark.sql import Window w = Window.partitionBy('user_id') result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\ .where(fun.col('time') == fun.col('time')) .drop('max_time') 

我们可以看到: 这个函数的关键就是运用了 spark 的 window 函数 ,灵活运用 威力无穷 哦 !

到这里,spark利器2函数之dataframe全局排序id与分组后保留最大值行 的全文 就写完了 ,更多关于spark dataframe全局排序的资料请关注云初冀北其它相关文章!

免责声明
本站提供的资源,都来自网络,版权争议与本站无关,所有内容及软件的文章仅限用于学习和研究目的。不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负,我们不保证内容的长久可用性,通过使用本站内容随之而来的风险与本站无关,您必须在下载后的24个小时之内,从您的电脑/手机中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。侵删请致信E-mail:Goliszhou@gmail.com
$

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

阅读
分享

发表评论

快捷回复:

评论列表 (暂无评论,101人围观)参与讨论

还没有评论,来说两句吧...