大佬教程收集整理的这篇文章主要介绍了将重叠间隔列表拆分为 pyspark 数据帧中的非重叠子间隔,并检查值在重叠间隔上是否有效,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个 pyspark 数据框,其中包含定义每行间隔的 start_time
、end_time
列。如果一个区间与至少另一个区间重叠,则它还包含设置为 is_duplicated
的列 True
;如果不是,则设置为 false
。
有一列rate
,我想知道子区间是否有不同的值(根据定义重叠);如果是这种情况,我想保留包含在 updated_at
列中的最新更新的记录作为基本事实。
在中间步骤中,我想创建一个列 is_valIDated
设置为:
注意:中间步骤不是强制性的,我提供它只是为了使解释更清楚。
输入:
# So this:
input_rows = [Row(start_time='2018-01-01 00:00:00',end_time='2018-01-04 00:00:00',rate=10,updated_at='2021-02-25 00:00:00'),# OVERLAP: (1,4) And (2,3) and (3,5) and rate=10/20
Row(start_time='2018-01-02 00:00:00',end_time='2018-01-03 00:00:00',# OVERLAP: full overlap for (2,3) with (1,4)
Row(start_time='2018-01-03 00:00:00',end_time='2018-01-05 00:00:00',rate=20,updated_at='2021-02-20 00:00:00'),# OVERLAP: (3,5) and (1,4) And rate=10/20
Row(start_time='2018-01-06 00:00:00',end_time='2018-01-07 00:00:00',rate=30,# NO OVERLAP: hole between (5,6)
Row(start_time='2018-01-07 00:00:00',end_time='2018-01-08 00:00:00',updated_at='2021-02-25 00:00:00')] # NO OVERLAP
df = spark.createDataFrame(input_rows)
df.show()
>>> +-------------------+-------------------+----+-------------------+
| start_time| end_time|rate| updated_at|
+-------------------+-------------------+----+-------------------+
|2018-01-01 00:00:00|2018-01-04 00:00:00| 10|2021-02-25 00:00:00|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00|
|2018-01-03 00:00:00|2018-01-05 00:00:00| 20|2021-02-20 00:00:00|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|2021-02-25 00:00:00|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|2021-02-25 00:00:00|
+-------------------+-------------------+----+-------------------+
# Will become:
tmp_rows = [Row(start_time='2018-01-01 00:00:00',end_time='2018-01-02 00:00:00',updated_at='2021-02-25 00:00:00',is_duplicated=false,is_valIDated=NonE),Row(start_time='2018-01-02 00:00:00',is_duplicated=True,is_valIDated=TruE),Row(start_time='2018-01-03 00:00:00',updated_at='2021-02-20 00:00:00',is_valIDated=falsE),Row(start_time='2018-01-04 00:00:00',Row(start_time='2018-01-06 00:00:00',Row(start_time='2018-01-07 00:00:00',is_valIDated=NonE)
]
tmp_df = spark.createDataFrame(tmp_rows)
tmp_df.show()
>>>
+-------------------+-------------------+----+-------------------+-------------+------------+
| start_time| end_time|rate| updated_at|is_duplicated|is_valIDated|
+-------------------+-------------------+----+-------------------+-------------+------------+
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|2021-02-25 00:00:00| false| null|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00| true| true|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00| true| true|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 10|2021-02-20 00:00:00| true| false|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 20|2021-02-25 00:00:00| true| true|
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|2021-02-25 00:00:00| false| null|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|2021-02-25 00:00:00| false| null|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|2021-02-25 00:00:00| false| null|
+-------------------+-------------------+----+-------------------+-------------+------------+
# To give you:
output_rows = [Row(start_time='2018-01-01 00:00:00',rate=10),rate=20),rate=30),rate=30)
]
final_df = spark.createDataFrame(output_rows)
final_df.show()
>>>
+-------------------+-------------------+----+
| start_time| end_time|rate|
+-------------------+-------------------+----+
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 10|
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
+-------------------+-------------------+----+
这有效:
SVG
,
您可以分解时间戳序列,就像您的中间数据帧一样,然后按开始时间和结束时间分组以根据更新时间获取最新速率。
import pyspark.sql.functions as F
output = df.SELEctExpr(
"""
inline(arrays_zip(
sequence(timestamp(start_timE),timestamp(end_timE) - interval 1 day,interval 1 day),sequence(timestamp(start_timE) + interval 1 day,timestamp(end_timE),interval 1 day)
)) as (start_time,end_timE)
""","rate","updated_at"
).groupBy(
'start_time','end_time'
).agg(
F.max(F.struct('updated_at','rate'))['rate'].alias('rate')
).orderBy("start_time")
output.show()
+-------------------+-------------------+----+
| start_time| end_time|rate|
+-------------------+-------------------+----+
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 10|
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
+-------------------+-------------------+----+
以上是大佬教程为你收集整理的将重叠间隔列表拆分为 pyspark 数据帧中的非重叠子间隔,并检查值在重叠间隔上是否有效全部内容,希望文章能够帮你解决将重叠间隔列表拆分为 pyspark 数据帧中的非重叠子间隔,并检查值在重叠间隔上是否有效所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。