程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了尝试写入 S3 时,StreamingFileSink 有时不起作用大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决尝试写入 S3 时,StreamingFileSink 有时不起作用?

开发过程中遇到尝试写入 S3 时,StreamingFileSink 有时不起作用的问题如何解决?下面主要结合日常开发的经验,给出你关于尝试写入 S3 时,StreamingFileSink 有时不起作用的解决方法建议,希望对你解决尝试写入 S3 时,StreamingFileSink 有时不起作用有所启发或帮助;

我正在尝试写入 S3 接收器。

private static StreamingfileSink<String> createS3SinkFromStaticConfig(
        final Map<String,PropertIEs> applicationPropertIEs
) {
    PropertIEs sinkPropertIEs = applicationPropertIEs.get(SINK_PROPERTIES);
    String s3SinkPath = sinkPropertIEs.getProperty(SINK_S3_PATH_KEY);
    return StreamingfileSink
            .forRowFormat(
                    new Path(s3SinkPath),new SimpleStringEncoder<String>(StandardCharsetS.UTF_8.toString())
            )
            .build();
}

以下代码有效,我可以在 S3 中看到结果

input.map(value -> { // Parse the JsON
    JsonNode JsonNode = JsonParser.readValue(value,JsonNode.class);
    return new Tuple2<>(JsonNode.get("ticker").asText(),JsonNode.get("price").asDouble());
}).returns(Types.TUPLE(Types.StriNG,Types.DOUBLE))
        .keyBy(0) // Logically partition the stream per stock symbol
        .timeWindow(Time.seconds(10),Time.seconds(5)) // SlIDing window deFinition
        .min(1) // Calculate minimum price per stock over the window
        .setParallelism(3) // Set parallelism for the min operator
        .map(value -> value.f0 + ": ----- " + value.f1.toString() + "\n")
        .addSink(createS3SinkFromStaticConfig(applicationPropertIEs));

但以下内容不会向 S3 写入任何内容。

KeyedStream<EnrichedMetric,EnrichedMetricKey> input = env.addsource(new EnrichedMetricsource())
        .assigntimestampsAnDWatermarks(
                WatermarkStrategy.<EnrichedMetric>forMonotonoustimestamps()
                        .withtimestampAssigner(((event,l) -> event.getEventTime()))
        ).keyBy(new EnrichedMetricKeySELEctor());

DataStream<String> statsstream = input
        .window(TumblingEventTimewindows.of(Time.seconds(5)))
        .process(new PValueStatisticsWindowFunction());

statsstream.addSink(createS3SinkFromStaticConfig(applicationPropertIEs));

PValueStatisticsWindowFunction 是一个 ProcessWindowFunction,如下所示。

@OverrIDe
public voID process(EnrichedMetricKey enrichedMetricKey,Context context,Iterable<EnrichedMetric> in,Collector<String> out) throws Exception {

    int count = 0;
    for (EnrichedMetric m : in) {
        count++;
    }

    out.collect("Count: " + count);
}

当我在本地运行 Flink 应用程序时,statsstream.print() 将结果打印到 log/flink-*-taskexecutor-*.out

在集群中,我可以看到检查点已启用以及 Flink 仪表板中的各种检查点历史记录。我还确保 S3 路径的格式为 s3a://<bucket>

不确定我在这里遗漏了什么。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

大佬总结

以上是大佬教程为你收集整理的尝试写入 S3 时,StreamingFileSink 有时不起作用全部内容,希望文章能够帮你解决尝试写入 S3 时,StreamingFileSink 有时不起作用所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签: