大佬教程收集整理的这篇文章主要介绍了尝试写入 S3 时,StreamingFileSink 有时不起作用,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试写入 S3 接收器。
private static StreamingfileSink<String> createS3SinkFromStaticConfig(
final Map<String,PropertIEs> applicationPropertIEs
) {
PropertIEs sinkPropertIEs = applicationPropertIEs.get(SINK_PROPERTIES);
String s3SinkPath = sinkPropertIEs.getProperty(SINK_S3_PATH_KEY);
return StreamingfileSink
.forRowFormat(
new Path(s3SinkPath),new SimpleStringEncoder<String>(StandardCharsetS.UTF_8.toString())
)
.build();
}
以下代码有效,我可以在 S3 中看到结果
input.map(value -> { // Parse the JsON
JsonNode JsonNode = JsonParser.readValue(value,JsonNode.class);
return new Tuple2<>(JsonNode.get("ticker").asText(),JsonNode.get("price").asDouble());
}).returns(Types.TUPLE(Types.StriNG,Types.DOUBLE))
.keyBy(0) // Logically partition the stream per stock symbol
.timeWindow(Time.seconds(10),Time.seconds(5)) // SlIDing window deFinition
.min(1) // Calculate minimum price per stock over the window
.setParallelism(3) // Set parallelism for the min operator
.map(value -> value.f0 + ": ----- " + value.f1.toString() + "\n")
.addSink(createS3SinkFromStaticConfig(applicationPropertIEs));
但以下内容不会向 S3 写入任何内容。
KeyedStream<EnrichedMetric,EnrichedMetricKey> input = env.addsource(new EnrichedMetricsource())
.assigntimestampsAnDWatermarks(
WatermarkStrategy.<EnrichedMetric>forMonotonoustimestamps()
.withtimestampAssigner(((event,l) -> event.getEventTime()))
).keyBy(new EnrichedMetricKeySELEctor());
DataStream<String> statsstream = input
.window(TumblingEventTimewindows.of(Time.seconds(5)))
.process(new PValueStatisticsWindowFunction());
statsstream.addSink(createS3SinkFromStaticConfig(applicationPropertIEs));
PValueStatisticsWindowFunction
是一个 ProcessWindowFunction
,如下所示。
@OverrIDe
public voID process(EnrichedMetricKey enrichedMetricKey,Context context,Iterable<EnrichedMetric> in,Collector<String> out) throws Exception {
int count = 0;
for (EnrichedMetric m : in) {
count++;
}
out.collect("Count: " + count);
}
当我在本地运行 Flink 应用程序时,statsstream.print()
将结果打印到 log/flink-*-taskexecutor-*.out
。
在集群中,我可以看到检查点已启用以及 Flink 仪表板中的各种检查点历史记录。我还确保 S3 路径的格式为 s3a://<bucket>
不确定我在这里遗漏了什么。
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
以上是大佬教程为你收集整理的尝试写入 S3 时,StreamingFileSink 有时不起作用全部内容,希望文章能够帮你解决尝试写入 S3 时,StreamingFileSink 有时不起作用所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。