大佬教程收集整理的这篇文章主要介绍了ApacheBeam-Java:MongoDB 过滤器,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我是数据流/Apchebeam 的新手。我正在从 MongoDB 中提取数据。 MongoDB 连接工作正常,但我无法应用过滤器。抛出以下错误。我不确定这是过滤数据的正确方法。任何建议都会有所帮助。
错误
2021-06-07 10:37:34.615 来自 worker 的 CESTError 消息:java.lang.classCastException:com.test.dataflow.dofns.MongodDbqueryFn 无法转换为 org.apache.beam.sdk.io。 mongodb.Aggregationquery org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbsource.split(MongoDbIO.java:522)
代码:R_59_11845@ongoDbIO 连接器:
return pipeline.apply(MongoDbIO.read()
.withUri("mongodb://".concat(databaseDetails.getDatabaseHostname()).concat(":").concat(databaseDetails.getPort()))
.withDatabase(databaseDetails.getDatabasename())
.withCollection(objectDetails.getobjectname())
.withqueryFn(new MongodDbqueryFn("name","Mahesh")));
queryFn Ptransform
package com.test.dataflow.dofns;
import org.apache.beam.sdk.transforms.serializableFunction;
import org.bson.document;
import com.mongodb.clIEnt.MongoCollection;
import com.mongodb.clIEnt.Mongocursor;
public class MongodDbqueryFn implements serializableFunction<MongoCollection<document>,Mongocursor<document>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String keyname;
private String keyvalue;
public MongodDbqueryFn(String keyname,String keyvalue) {
this.keyname = keyname;
this.keyvalue = keyvalue;
}
@OverrIDe
public Mongocursor<document> apply(MongoCollection<document> input) {
return input.find(com.mongodb.clIEnt.model.Filters.eq(keyname,keyvalue)).iterator();
}
}
withQueryFn
的文档并没有很好地解释这一点,但是从通读代码到 MongoDbIO 看来,@H_687_17@mongoDbIO.Read 假定 QueryFn
设置为 {{3 }} 或 AggregationQuery
。您遇到的错误是因为代码正在检查 queryFn 是否为 FindQuery
,结果为 false,然后假设它是 AggregationQuery
并尝试对其进行转换。
最适合您的解决方案是使用 FindQuery
获得与您编写的相同的行为,如下所示:
.withQueryFn(FindQuery.create().withFilters(Filters.eq("name","Mahesh"))));
以上是大佬教程为你收集整理的ApacheBeam-Java:MongoDB 过滤器全部内容,希望文章能够帮你解决ApacheBeam-Java:MongoDB 过滤器所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。