Browse Source

Added deadletter queue support

Fred Damstra (Macbook 2015) 2 years ago
parent
commit
a218d01d8d

+ 4 - 1
README.md

@@ -47,7 +47,9 @@ pip install --target ./module_sqs_fair_queueing/scripts jsonpath-ng
 | [aws_s3_bucket_notification.bucket_notification](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_notification) | resource |
 | [aws_s3_bucket_public_access_block.bucket](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_public_access_block) | resource |
 | [aws_s3_bucket_server_side_encryption_configuration.bucket](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_server_side_encryption_configuration) | resource |
+| [aws_sqs_queue.deadletter](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue) | resource |
 | [aws_sqs_queue.queue](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue) | resource |
+| [aws_sqs_queue_redrive_allow_policy.deadletter](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue_redrive_allow_policy) | resource |
 
 ## Inputs
 
@@ -57,5 +59,6 @@ No inputs.
 
 | Name | Description |
 |------|-------------|
-| <a name="output_example"></a> [example](#output\_example) | n/a |
+| <a name="output_arns"></a> [arns](#output\_arns) | n/a |
+| <a name="output_urls"></a> [urls](#output\_urls) | n/a |
 <!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->

+ 3 - 0
config.tf

@@ -1,4 +1,6 @@
 locals {
+  sqs_prefix = "fdamstra-fair-queueing-test-fq"
+
   # Everything here should be self-explanatory
   profile = "default"
   region  = "us-east-2"
@@ -10,3 +12,4 @@ locals {
 # Uncomment if needed
 #data "aws_caller_identity" "current" {}
 #data "aws_partition" "current" {}
+#data "aws_region" "current" {}

+ 31 - 7
main.tf

@@ -3,13 +3,12 @@
 module "sqs_fair_queue" {
   source = "./module_sqs_fair_queueing"
 
-  source_sqs_arn = aws_sqs_queue.queue.arn
-  source_sqs_url = aws_sqs_queue.queue.url
-  lambda_timeout = aws_sqs_queue.queue.visibility_timeout_seconds - 5
-  sqs_prefix     = "mbox-fair-queueing-test-fq"
-  num_queues     = 16
-  hash_jsonpath  = "$" # This will evenly distribute all messages
-  tags           = local.tags
+  source_sqs = aws_sqs_queue.queue
+  #deadletter    = aws_sqs_queue.deadletter
+  sqs_prefix    = local.sqs_prefix
+  num_queues    = 16
+  hash_jsonpath = "$" # This will evenly distribute all messages
+  tags          = local.tags
 }
 
 ######################################
@@ -83,11 +82,36 @@ resource "aws_sqs_queue" "queue" {
 }
 POLICY
 
+  redrive_policy = jsonencode({
+    deadLetterTargetArn = aws_sqs_queue.deadletter.arn
+    maxReceiveCount     = 4
+  })
+
+  # NOTE: If you set this below about 15, then you must decrease how many
+  # messages are processed per batch by lambda.
+  visibility_timeout_seconds = 30
+
   depends_on = [aws_s3_bucket.bucket]
 
   tags = local.tags
 }
 
+resource "aws_sqs_queue" "deadletter" {
+  name = "mbox-bucket-notification-dlq"
+
+  sqs_managed_sse_enabled = true
+}
+
+resource "aws_sqs_queue_redrive_allow_policy" "deadletter" {
+  queue_url = aws_sqs_queue.deadletter.id
+
+  redrive_allow_policy = jsonencode({
+    redrivePermission = "allowAll" # Must allow all if > 9 bins
+    #sourceQueueArns   = [aws_sqs_queue.queue.arn, local.sqs_wildcard_arn]
+    #sourceQueueArns   = concat([aws_sqs_queue.queue.arn], module.sqs_fair_queue.arns)
+  })
+}
+
 resource "aws_s3_bucket_notification" "bucket_notification" {
   bucket = aws_s3_bucket.bucket.id
 

+ 12 - 6
module_sqs_fair_queueing/lambda.tf

@@ -1,5 +1,11 @@
+locals {
+  # By default, We allow lambda to run 5 seconds less than the visibility timeout, unless
+  # the visiblity timeout is > 300 (5 minutes)
+  lambda_timeout = (var.source_sqs.visibility_timeout_seconds > 300 ? 300 : (var.source_sqs.visibility_timeout_seconds - 5))
+}
+
 resource "aws_lambda_event_source_mapping" "sqs_fair_queue" {
-  event_source_arn                   = var.source_sqs_arn
+  event_source_arn                   = var.source_sqs.arn
   function_name                      = aws_lambda_function.sqs_fair_queue.arn
   batch_size                         = 100
   maximum_batching_window_in_seconds = 30 # How long to wait to gather a batch, max: 300
@@ -19,7 +25,7 @@ resource "aws_lambda_function" "sqs_fair_queue" {
   function_name = "sqs_fair_queue_${var.sqs_prefix}"
   role          = aws_iam_role.sqs_fair_queue.arn
   handler       = "sqs_fair_queue.lambda_handler"
-  timeout       = var.lambda_timeout
+  timeout       = local.lambda_timeout
   # NOTE: If it can't handle the batch in the time alloted, there is a chance for duplicates.
 
   source_code_hash = data.archive_file.sqs_fair_queue.output_base64sha256
@@ -28,8 +34,8 @@ resource "aws_lambda_function" "sqs_fair_queue" {
 
   environment {
     variables = {
-      #"SOURCE_SQS_ARN" = var.source_sqs_arn Not needed?
-      "SOURCE_SQS_URL" = var.source_sqs_url
+      #"SOURCE_SQS_ARN" = var.source_sqs.arn # Not needed?
+      "SOURCE_SQS_URL" = var.source_sqs.url
       "SQS_PREFIX"     = var.sqs_prefix
       "NUM_QUEUES"     = var.num_queues
       "HASH_JSONPATH"  = var.hash_jsonpath
@@ -49,14 +55,14 @@ resource "aws_lambda_permission" "sqs_fair_queue" {
   action        = "lambda:InvokeFunction"
   function_name = aws_lambda_function.sqs_fair_queue.function_name
   principal     = "sqs.amazonaws.com"
-  source_arn    = var.source_sqs_arn
+  source_arn    = var.source_sqs.arn
 }
 
 data "aws_iam_policy_document" "sqs_fair_queue" {
   statement {
     sid       = "SQSIngest"
     effect    = "Allow"
-    resources = [var.source_sqs_arn]
+    resources = [var.source_sqs.arn]
     # tfsec:ignore:aws-iam-no-policy-wildcards Wildcards are fine and useful
     actions = ["sqs:*"] # TODO: Nail down
     # Probably:

+ 7 - 0
module_sqs_fair_queueing/output.tf

@@ -0,0 +1,7 @@
+output "arns" {
+  value = tolist(aws_sqs_queue.queue[*].arn)
+}
+
+output "urls" {
+  value = tolist(aws_sqs_queue.queue[*].url)
+}

+ 1 - 1
module_sqs_fair_queueing/required_providers.tf

@@ -3,7 +3,7 @@ terraform {
   required_providers {
     aws = {
       source  = "hashicorp/aws"
-      version = "~> 4.0"
+      version = "~> 4.36"
     }
     archive = {
       source  = "hashicorp/archive"

+ 5 - 17
module_sqs_fair_queueing/sqs.tf

@@ -4,23 +4,11 @@ resource "aws_sqs_queue" "queue" {
 
   sqs_managed_sse_enabled = true
 
-  # TBD
-  #  policy = <<POLICY
-  #{
-  #  "Version": "2012-10-17",
-  #  "Statement": [
-  #    {
-  #      "Effect": "Allow",
-  #      "Principal": "*",
-  #      "Action": "sqs:SendMessage",
-  #      "Resource": "arn:aws:sqs:*:*:${var.sqs_prefix}${count.index}",
-  #      "Condition": {
-  #        "ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.bucket.arn}" }
-  #      }
-  #    }
-  #  ]
-  #}
-  #POLICY
+  # Copy settings from source
+  redrive_policy             = var.source_sqs.redrive_policy
+  visibility_timeout_seconds = var.source_sqs.visibility_timeout_seconds
+  message_retention_seconds  = var.source_sqs.message_retention_seconds
+  max_message_size           = var.source_sqs.max_message_size
 
   tags = var.tags
 }

+ 17 - 12
module_sqs_fair_queueing/vars.tf

@@ -1,17 +1,22 @@
-variable "source_sqs_arn" {
-  type        = string
-  description = "ARN of the source SQS queue"
-}
-
-variable "source_sqs_url" {
-  type        = string
-  description = "URL of the source SQS queue"
+variable "source_sqs" {
+  type = object({
+    arn                        = string
+    url                        = string
+    visibility_timeout_seconds = number
+    message_retention_seconds  = number
+    max_message_size           = number
+    redrive_policy             = string
+  })
+  description = "The aws_sqs_queue object of the source"
 }
 
-variable "lambda_timeout" {
-  type        = number
-  description = "Lambda timeout must be less than or equal to the Visiblity timeout of the source SQS queue."
-}
+#variable "deadletter" {
+# type = object({
+#   arn = string
+#   url = string
+# })
+# description = "The deadletter queue. Redriving puts them back through the fair queueing."
+#
 
 variable "sqs_prefix" {
   type        = string

+ 4 - 3
output.tf

@@ -1,6 +1,7 @@
-locals {
+output "arns" {
+  value = module.sqs_fair_queue.arns
 }
 
-output "example" {
-  value = "Configure some outputs."
+output "urls" {
+  value = module.sqs_fair_queue.urls
 }