Swift   发布时间:2022-04-30  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了swift – 使用Vapor 3创建和使用游标大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
这可能是一堆蠕虫,我会尽力描述这个问题.我们有一个长期运行的数据处理工作.我们的行动数据库会在夜间@L_944_2@,并处理未完成的操作.处理夜间行动大约需要15分钟.在Vapor 2中,我们使用了大量的原始查询来创建POSTGResql游标并循环遍历它直到它为空.

目前,我们通过命令行参数运行处理.将来我们希望它作为主服务器的一部分运行,以便在执行处理时检查进度.

@H_404_13@func run(using context: CommandContext) throws -> Future<Void> { let table = "\"RecRegAction\"" let cursorName = "\"action_cursor\"" let chunkSize = 10_000 return context.container.withNewConnection(to: .psql) { connection in return POSTGResqlDatabase.transactionExecute({ connection -> Future<Int> in return connection.simpleQuery("DECLARE \(cursorName) cursOR FOR SELECT * FROM \(tablE)").map { result in var @R_122_10586@lResults = 0 var finished : Bool = false while !finished { let results = try connection.raw("FETCH \(chunkSizE) FROM \(cursorName)").all(decoding: RecRegAction.self).wait() if results.count > 0 { @R_122_10586@lResults += results.count print(@R_122_10586@lResults) // ObvIoUsly we do our processing here } else { finished = true } } return @R_122_10586@lResults } },on: connection) }.transform(to: ()) }

在这不起作用,因为我正在调用wait()并且我得到@L_450_11@“Precondition Failed:在EventLoop上不能调用wait()”这是公平的.我面临的一个问题是,我不知道你是如何在主要的事件循环中运行这样在后台线程上运行这样的东西.我知道BlockingIOThreadPool,但似乎仍然在同一个EventLoop上运行并仍然导致@L_450_11@.然我能够通过越来越复杂的方式来实现这一目标,但我希望我找不到一个优雅的解决方案,也许对SwiftNIO和Fluent有更好了解的人可以提供帮助.

编辑:要明确,这个目标显然不是要总计数据库中的操作数.目标是使用游标同步处理每个操作.当我读到结果时,我会检测到操作中的更改,然后将它们的批次抛出到处理线程中.当所有线程都忙时,我不会再次从光标读取,直到它们完成.

这些行动很多,一次运行多达4500万.聚合承诺和递归似乎并不是一个好主意,当我尝试它时,只是为了它,服务器挂起.

这是一个处理密集型任务,可以在一个线程上运行数天,所以我不关心创建新线程.问题是我无法弄清楚如何在Command中使用wait()函数,因为我需要一个容器来创建数据库连接,而我唯一可以访问的是context.container对此调用wait()会导致以上@L_450_11@.

TIA

解决方法

好的,如你所知,问题在于以下几点:

@H_404_13@while ... { ... try connection.raw("...").all(decoding: RecRegAction.self).wait() ... }

你想等待一些结果,因此你使用while循环和.wait()来获得所有中间结果.本质上,这是在事件循环中将异步代码转换为同步@L_674_32@.这可能会导致死锁,并且肯定会阻止其他连接,这就是为什么SwiftNIO会尝试检测到并导致@L_450_11@的原因.我不会详细说明为什么它会阻止其他连接,或者为什么这可能会导致这个答案陷入僵局.

让我们看看我们有什么选择来解决这个问题:

>正如你所说,我们可以在另一个不是事件循环线程之一的线程上使用.wait().对于这个任何非EventLoop线程会这样做:DispatchQueue或你可以使用BlockingIOThreadPool(它不在EventLoop上运行)
>我们可以将您的@L_674_32@重写为异步

两种解决方案都可以工作,但(1)实际上并不可取,因为您只需要等待结果就可以刻录整个(内核)线程.并且Dispatch和BlockingIOThreadPool都有他们愿意产生的有限数量的线程,所以如果你经常这样做,你可能会耗尽线程,所以它需要更长的时间.

因此,让我们看看如何在累积中间结果的同时多次调用异步函数.然后,如果我们累积了所有中间结果,则继续所有结果.

为了简化操作,让我们看看与您的功能非常相似的功能.我们假设这个函数就像你的@L_674_32@一样提供

@H_404_13@/// delivers partial results (Integers) and `nil` if no further elements are available func deliverPartialResult() -> EventLoopFuture<Int?> { ... }

我们现在想要的是一个功能

@H_404_13@func deliverFullResult() -> EventLoopFuture<[Int]>

请注意deliverPartialResult每次返回一个整数,deliverFullResult传递一个整数数组(即所有整数).好的,那么我们如何编写deliverFullResult而不调用deliverPartialResult().wait()?

那这个呢:

@H_404_13@func accumulateResults(eventLoop: EventLoop,partialResultsSoFar: [Int],getPartial: @escaping () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's check what it is if let partialResult = partialResult { // another intermediate results,let's accumulate and call getPartial again return accumulateResults(eventLoop: eventLoop,partialResultsSoFar: partialResultsSoFar + [partialResult],getPartial: getPartial) } else { // we've got all the partial results,yay,let's fulfill the overall future return eventLoop.newSucceededFuture(result: partialResultsSoFar) } } }

鉴于accumulateResults,实现deliverFullResult不再太难了:

@H_404_13@func deliverFullResult() -> EventLoopFuture<[Int]> { return accumulateResults(eventLoop: myCurrentEventLoop,partialResultsSoFar: [],getPartial: deliverPartialResult) }

但让我们更多地了解accumulateResults的作用:

>它调用getPartial一次,然后调用
>检查我们是否有

>部分结果,在这种情况下,我们将记住它与其他partialResultsSoFar并返(1)
> nil,这意味着partialResultsSoFar就是我们得到的所有东西,我们回归到了一个新的成功未来

那已经是真的了.我们在这里做的是将同步循环转换为异步递归.

好的,我们查看了很多@L_674_32@,但这与你的功能现在有什么关系?

信不信由你,但这实际上应该有效(未经测试):

@H_404_13@accumulateResults(eventLoop: el,partialResultsSoFar: []) { connection.raw("FETCH \(chunkSizE) FROM \(cursorName)") .all(decoding: RecRegAction.self) .map { results -> Int? in if results.count > 0 { return results.count } else { return nil } } }.map { allResults in return allResults.reduce(0,+) }

所有这些的结果将是EventLoopF​​uture< Int>它携带所有中间result.count的总和.

当然,我们首先将所有计数收集到一个数组中,然后在最后总结它(allResults.reduce(0,)),这有点浪费但也不是世界末日.我这样离开它是因为这使得accumulateResults可用于你想在数组中累积部分结果的其他情况.

现在最后一件事,一个真正的accumulateResults函数可能在元素类型上是通用的,我们也可以消除外部函数的partialResultsSoFar参数.那这个呢?

@H_404_13@func accumulateResults<T>(eventLoop: EventLoop,getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> { // this is an inner function just to hide it from the outside which carries the accumulator func accumulateResults<T>(eventLoop: EventLoop,partialResultsSoFar: [T] /* our accumulator */,getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's check what it is if let partialResult = partialResult { // another intermediate results,let's accumulate and call getPartial again return accumulateResults(eventLoop: eventLoop,getPartial: getPartial) } else { // we've got all the partial results,let's fulfill the overall future return eventLoop.newSucceededFuture(result: partialResultsSoFar) } } } return accumulateResults(eventLoop: eventLoop,getPartial: getPartial) }

编辑:编辑后,您的问题表明您实际上并不想积累中间结果.所以我的猜测是,您希望在收到每个中间结果后进行一些处理.如果这是你想要做的,也许试试这个:

@H_404_13@func processPartialResults<T,V>(eventLoop: EventLoop,process: @escaping (T) -> EventLoopFuture<V>,getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> { func processPartialResults<T,soFar: V?,getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's call the process function and move on return process(partialResult).then { v in return processPartialResults(eventLoop: eventLoop,soFar: v,process: process,getPartial: getPartial) } } else { // we've got all the partial results,let's fulfill the overall future return eventLoop.newSucceededFuture(result: soFar) } } } return processPartialResults(eventLoop: eventLoop,soFar: nil,getPartial: getPartial) }

这将(如前所述)运行getPartial,直到它返回nil,但不是累积所有getPartial的结果,而是调用获得部分结果的进程,并且可以进行一些进一步的处理.当满足EventLoopF​​uture进程返回时,将发生下一个getPartial调用.

那更接近你想要的吗?

注意:我在这里使用了SwiftNIO的EventLoopF​​uture类型,在Vapor中您只需使用Future,但@L_674_32@的其余部分应该相同.

大佬总结

以上是大佬教程为你收集整理的swift – 使用Vapor 3创建和使用游标全部内容,希望文章能够帮你解决swift – 使用Vapor 3创建和使用游标所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。