程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了带条件的 PySpark 窗口大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决带条件的 PySpark 窗口?

开发过程中遇到带条件的 PySpark 窗口的问题如何解决?下面主要结合日常开发的经验,给出你关于带条件的 PySpark 窗口的解决方法建议,希望对你解决带条件的 PySpark 窗口有所启发或帮助;

我有一个包含应用程序日志的数据集,显示某个应用程序的启动或关闭时间。有时,相关事件可能完全从日志中丢失。我想将每个应用开始与相关的结束事件(如果存在)相匹配。

这是一个说明性的数据集:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([['Group1','logon','name1','2021-02-05T19:03:00.000+0000'],['Group1','Start','2021-02-05T19:04:00.000+0000'],'2021-02-05T19:05:00.000+0000'],'End','2021-02-05T19:06:00.000+0000'],'name3','2021-02-05T19:06:01.000+0000'],'2021-02-05T19:07:00.000+0000'],['Group2','name2','Close',],['group','type','name','time'])

df = df.with@R_607_8620@n('time',F.col('time').cast('timestamp'))

对于每个组,如果它们具有相同的“名称”,我想为每个“开始”和“结束”事件放置一个通用标识符。换句话说,对于每个“开始”事件,我想找到尚未与另一个“开始”事件匹配的第一个“结束”事件

预期结果可能类似于下图:

带条件的 PySpark 窗口

我不介意标识符(即“my_group”)是 ID、时间戳还是在组间单调递增。我只是希望能够匹配每个组内的相关事件。

我的尝试

我想过使用窗口函数来识别“开始”事件的结束时间和“结束”事件的开始时间。但是,我不能仅限于搜索“结束”事件(分别是“开始”事件)。此外,我无法应用上述查找尚未与另一个“开始”事件匹配的第一个“结束”事件的逻辑。

这是我的代码:

app_session_window_down = Window.partitionBy('group',"name").orderBy(F.col("time").cast('long')).rangebetween(1,Window.unboundedFollowing) #search in the future
app_session_window_up = Window.partitionBy('group',"name").orderBy(F.col("time").cast('long')).rangebetween(Window.unboundedPreceding,-1) #search in the past

df = df.with@R_607_8620@n("app_time_end",F.when((F.col("type") == 'Start'),F.first(F.col('time'),ignorenulls=TruE).over(app_session_window_down)).otherwise(F.lit('None')))\
    .with@R_607_8620@n("app_time_start",F.when((F.col("type") == 'End'),F.last(F.col('time'),ignorenulls=TruE).over(app_session_window_up)).otherwise(F.col('app_time_end')))

给出:

带条件的 PySpark 窗口

这与我想要实现的目标相去甚远。有什么提示吗?

解决方法

内嵌注释中有说明:

from pyspark.sql import functions as F,Window

df2 = df.with@R_607_8620@n(
    'my_group',# the @R_607_8620@n you wanted
    F.when(
        F.col('type').isin(['Start','End']),F.row_number().over(Window.partitionBy('group','name','type').orderBy('time'))
    )
).with@R_607_8620@n(
    'max_group',# Helper @R_607_8620@n: get maximum row_number for each group ; will be used later
    F.least(
        F.max(
            F.when(
                F.col('type') == 'Start',F.col('my_group')
            ).otherwise(0)
        ).over(Window.partitionBy('group','name')),F.max(
            F.when(
                F.col('type') == 'End','name'))
    )
).with@R_607_8620@n(
    'my_group',# mask the rows which don't have corresponding 'start'/'end'
    F.when(
        F.col('my_group') <= F.col('max_group'),F.col('my_group')
    )
).with@R_607_8620@n(
    'my_group',# add the group name
    F.when(F.col('my_group').isnotNull(),F.concat_ws('_','group','my_group'))
).drop('max_group').orderBy('group','time')
df2.show()
+------+-----+-----+-------------------+--------------+
| group| type| name|               time|      my_group|
+------+-----+-----+-------------------+--------------+
|Group1|Logon|Name1|2021-02-05 19:03:00|          null|
|Group1|Start|Name1|2021-02-05 19:04:00|Group1_Name1_1|
|Group1|Start|Name1|2021-02-05 19:05:00|Group1_Name1_2|
|Group1|  End|Name1|2021-02-05 19:06:00|Group1_Name1_1|
|Group1|  End|Name3|2021-02-05 19:06:01|          null|
|Group1|  End|Name1|2021-02-05 19:07:00|Group1_Name1_2|
|Group2|Start|Name1|2021-02-05 19:04:00|Group2_Name1_1|
|Group2|Start|Name1|2021-02-05 19:05:00|          null|
|Group2|Start|Name2|2021-02-05 19:06:00|          null|
|Group2|  End|Name1|2021-02-05 19:07:00|Group2_Name1_1|
|Group2|Close|Name1|2021-02-05 19:07:00|          null|
+------+-----+-----+-------------------+--------------+

大佬总结

以上是大佬教程为你收集整理的带条件的 PySpark 窗口全部内容,希望文章能够帮你解决带条件的 PySpark 窗口所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。