如何使用lambda函数处理SQS队列(不通过预定事件)?

这是我正在努力工作的简化scheme:

http请求 – >(网关API + lambda A) – > SQS – >(lambda B ?????) – > DynamoDB

所以它应该如下图所示:来自许多http请求的数据(例如每秒500个)被我的lambda函数A放入到SQS队列中。然后另一个函数B处理队列:读取多达10个项目(定期),并使用BatchWriteItem将它们写入DynamoDB。

问题是,我不知道如何触发第二个lambda函数。 应该频繁地调用,每秒多次(或者至less每秒一次),因为我需要从队列中的所有数据尽快进入DynamoDB(这就是为什么通过调度事件调用lambda函数B( 这里描述的不是一个选项)


为什么我不想直接写入DynamoDB,没有SQS?

这对我来说完全可以避免使用SQS。 我试图用SQS解决的问题是DynamoDB限制。 在使用AWS开发工具包(SDK)将数据写入DynamoDB的过程中,甚至不用自行节制,而是将数据写入DynamoDB时处理的方式:在逐个写入logging并限制数据logging时,AWS SDK会以静默方式重试写入,导致请求处理时间从http客户端视图。

因此,我想临时存储队列中的数据,发送响应“200 OK”返回给客户端,然后通过单独的函数获得队列处理,用一个DynamoDB的BatchWriteItem调用写入多个logging(返回未处理的项目而不是自动重试的节stream)。 我甚至希望丢失一些logging,而不是增加在DynamoDB中收到和存储的logging之间的延迟

UPD:如果有人感兴趣,我已经find了如何使aws-sdk在节stream的情况下跳过自动重试:有一个特殊的参数maxRetries 。 无论如何,要使用Kinesis如下所示

[这并不直接回答你明确的问题,所以根据我的经验,这将是downvoted :)但是,我会回答你想要解决的根本问题。]

我们采用大量的传入请求并将它们提供给AWS Lambda函数以便以节奏的方式写入DynamoDB的方式是将提议的体系结构中的SQSreplace为Amazon Kinesisstream。

Kinesisstream可以驱动AWS Lambda函数。

Kinesisstream保证任何给定密钥的传送消息的sorting(对于有序的数据库操作来说很好)。

通过Kinesisstream,您可以指定可以并行运行多less个AWS Lambda函数(每个分区一个),这些函数可以与您的DynamoDB写入容量进行协调。

Kinesisstream可以在一个AWS Lambda函数调用中传递多个可用消息,从而进一步优化。

注意:实际上,从Amazon Kinesisstream读取的AWS Lambda服务会调用该函数,而不是直接调用AWS Lambda的Kinesisstream; 但是有时候Kinesis驾驶它会更容易形象化。 对用户的结果几乎是一样的。

不幸的是,您无法直接整合SQS和Lambda。 但是,不要太担心。 有一个解决scheme! 你需要添加另一个亚马逊服务组合,所有的问题将得到解决。

http requests --> (Gateway API + lambda A) --> SQS + SNS --> lambda B --> DynamoDB 

您可以触发SNS通知到第二个lambda服务启动它。 一旦启动,它可以排空队列并将所有结果写入DynamoDB。 为了更好地理解Lambda的可能事件源,请查看这些文档 。

另一种解决scheme是将项目添加到SQS,使用Event调用目标Lambda函数,以便它是asynchronous的。

然后,asynchronousLambda可以从SQS获取尽可能多的项目并处理它们。

我还会将一个计划调用添加到asynchronousLambda以处理队列中出错的任何项目。

也许一个更具成本效益的解决scheme是将所有内容都保存在SQS中(原样),然后运行调度multithreadingLambda函数的预定事件来处理队列中的项目?

这样,您的队列工作人员就可以准确地匹配您的限制。 如果队列是空的,函数可以提前完成或在单线程中开始轮询。

对于这种情况,Kinesis听起来像是一种过度杀戮 – 例如,您不需要原始订单。 同时运行多个Lambda同时运行一个multithreadingLambda肯定要更昂贵。

您的Lambda将全部关于I / O,从而对AWS服务进行外部调用,因此一个函数可能非常适合。