Browse Source

Appears functional

Fred Damstra (Macbook 2015) 2 years ago
parent
commit
6d98095dfe

+ 1 - 0
.gitignore

@@ -1,3 +1,4 @@
+.terraform.lock.hcl
 .terraform
 
 *.bak

+ 2 - 2
backend.tf

@@ -1,8 +1,8 @@
 terraform {
   backend "s3" {
-    bucket  = "terraform-remote-state-20221017144428493300000001"
+    bucket = "terraform-remote-state-20221017144428493300000001"
     # Key must be unique amongst all projects that use this backend
-    key     = REPLACE ME WITH SOMETHING UNIQUE
+    key     = "sqs_fair_queueing"
     region  = "us-east-2"
     encrypt = true
     profile = "default"

+ 1 - 1
config.tf

@@ -1,6 +1,6 @@
 locals {
   # I like unique id to match the terraform backend storage, and I use it for various names and prefixes..
-  unique_id = REPLACE ME
+  unique_id = "sqs_fair_queueing"
 
   # Everything here should be self-explanatory
   profile = "default"

+ 83 - 0
main.tf

@@ -0,0 +1,83 @@
+######################################
+# The fair queueing module example
+module "sqs_fair_queue" {
+  source = "./module_sqs_fair_queueing"
+
+  source_sqs_arn = aws_sqs_queue.queue.arn
+  sqs_prefix     = "mbox-fair-queueing-test-fq"
+  num_queues     = 4
+  hash_jsonpath  = "$" # This will evenly distribute all messages
+}
+
+######################################
+# Example Resources for testing
+resource "aws_s3_bucket" "bucket" {
+  bucket        = "mbox-fair-queueing-test"
+  force_destroy = true
+
+  tags = {
+    Name        = "mbox-fair-queueing-test"
+    Environment = "Dev"
+    Purpose     = "POC bucket for S3 fair queueing"
+  }
+}
+
+resource "aws_s3_bucket_acl" "bucket" {
+  bucket = aws_s3_bucket.bucket.id
+  acl    = "private"
+}
+
+resource "aws_s3_bucket_server_side_encryption_configuration" "example" {
+  bucket = aws_s3_bucket.bucket.bucket
+
+  rule {
+    apply_server_side_encryption_by_default {
+      sse_algorithm = "AES256"
+    }
+  }
+}
+
+
+# SNS and SQS configuration for the root bucket
+# 
+# NOTE! Only this first sns/sqs needs to be set up.
+# The module will set up the sqs queues for FIFO.
+# 
+# Remember that the consumer service needs access to the FIFO queues,
+# not these.
+resource "aws_sqs_queue" "queue" {
+  name = "mbox-bucket-notification"
+
+  sqs_managed_sse_enabled = true
+
+  policy = <<POLICY
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Principal": "*",
+      "Action": "sqs:SendMessage",
+      "Resource": "arn:aws:sqs:*:*:mbox-bucket-notification",
+      "Condition": {
+        "ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.bucket.arn}" }
+      }
+    }
+  ]
+}
+POLICY
+
+  depends_on = [aws_s3_bucket.bucket]
+}
+
+resource "aws_s3_bucket_notification" "bucket_notification" {
+  bucket = aws_s3_bucket.bucket.id
+
+  queue {
+    queue_arn     = aws_sqs_queue.queue.arn
+    events        = ["s3:ObjectCreated:*"]
+    filter_prefix = "incoming/"
+  }
+
+  depends_on = [aws_sqs_queue.queue, aws_s3_bucket.bucket]
+}

+ 10 - 0
module_sqs_fair_queueing/README.md

@@ -0,0 +1,10 @@
+# SQS Fair Queueing Module
+
+# Why not trigger directly from S3?
+
+S3 can trigger lambda directly. However, doing so has limited failure handling and less
+ability to handle bursts in traffic.
+
+SQS is recommended, and the cost increase is minimal. It also makes the module applicable
+to a wider variety of events.
+

+ 45 - 0
module_sqs_fair_queueing/example_message.json

@@ -0,0 +1,45 @@
+{  
+   "Records":[  
+      {  
+         "eventVersion":"2.2",
+         "eventSource":"aws:s3",
+         "awsRegion":"us-west-2",
+         "eventTime":"The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request",
+         "eventName":"event-type",
+         "userIdentity":{  
+            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
+         },
+         "requestParameters":{  
+            "sourceIPAddress":"ip-address-where-request-came-from"
+         },
+         "responseElements":{  
+            "x-amz-request-id":"Amazon S3 generated request ID",
+            "x-amz-id-2":"Amazon S3 host that processed the request"
+         },
+         "s3":{  
+            "s3SchemaVersion":"1.0",
+            "configurationId":"ID found in the bucket notification configuration",
+            "bucket":{  
+               "name":"bucket-name",
+               "ownerIdentity":{  
+                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
+               },
+               "arn":"bucket-ARN"
+            },
+            "object":{  
+               "key":"object-key",
+               "size":"object-size in bytes",
+               "eTag":"object eTag",
+               "versionId":"object version if bucket is versioning-enabled, otherwise null",
+               "sequencer": "a string representation of a hexadecimal value used to determine event sequence, only used with PUTs and DELETEs"
+            }
+         },
+         "glacierEventData": {
+            "restoreEventData": {
+               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
+               "lifecycleRestoreStorageClass": "Source storage class for restore"
+            }
+         }
+      }
+   ]
+}

+ 102 - 0
module_sqs_fair_queueing/lambda.tf

@@ -0,0 +1,102 @@
+resource "aws_lambda_event_source_mapping" "sqs_fair_queue" {
+  event_source_arn                   = var.source_sqs_arn
+  function_name                      = aws_lambda_function.sqs_fair_queue.arn
+  batch_size                         = 100
+  maximum_batching_window_in_seconds = 30 # How long to wait to gather a batch, max: 300
+}
+
+# To install prereqs:
+#   pip install --target ./scripts jsonpath-ng
+data "archive_file" "sqs_fair_queue" {
+  type        = "zip"
+  source_dir  = "${path.module}/scripts/"
+  output_path = "${path.module}/tmp/sqs_fair_queue.zip"
+}
+
+
+resource "aws_lambda_function" "sqs_fair_queue" {
+  filename      = data.archive_file.sqs_fair_queue.output_path
+  function_name = "sqs_fair_queue_${var.sqs_prefix}"
+  role          = aws_iam_role.sqs_fair_queue.arn
+  handler       = "sqs_fair_queue.lambda_handler"
+
+  source_code_hash = data.archive_file.sqs_fair_queue.output_base64sha256
+
+  runtime = "python3.9"
+
+  environment {
+    variables = {
+      "SOURCE_SQS_ARN" = var.source_sqs_arn
+      "SQS_PREFIX"     = var.sqs_prefix
+      "NUM_QUEUES"     = var.num_queues
+      "HASH_JSONPATH"  = var.hash_jsonpath
+    }
+  }
+}
+
+resource "aws_lambda_permission" "sqs_fair_queue" {
+  statement_id  = "AllowExecutionFromSQS"
+  action        = "lambda:InvokeFunction"
+  function_name = aws_lambda_function.sqs_fair_queue.function_name
+  principal     = "sqs.amazonaws.com"
+  source_arn    = var.source_sqs_arn
+}
+
+data "aws_iam_policy_document" "sqs_fair_queue" {
+  statement {
+    sid       = "SQSIngest"
+    effect    = "Allow"
+    resources = [var.source_sqs_arn]
+    actions   = ["sqs:*"] # TODO: Nail down
+    # Probably:
+    #   "sqs:ReceiveMessage",
+    #   "sqs:SendMessage",
+    #   "sqs:GetQueueAttributes"
+    #   "sqs:GetQueueUrl"
+  }
+
+  statement {
+    sid       = "SQSPut"
+    effect    = "Allow"
+    resources = tolist(aws_sqs_queue.queue[*].arn)
+    actions   = ["sqs:*"] # TODO: Nail down
+  }
+}
+
+resource "aws_iam_policy" "sqs_fair_queue" {
+  name        = "sqs_fair_queue_${var.sqs_prefix}"
+  path        = "/sqs_fair_queue/"
+  description = "SQS Fair Queueing Lambda Policy"
+  policy      = data.aws_iam_policy_document.sqs_fair_queue.json
+}
+
+data "aws_iam_policy_document" "lambda_trust" {
+  statement {
+    sid     = ""
+    effect  = "Allow"
+    actions = ["sts:AssumeRole"]
+
+    principals {
+      type        = "Service"
+      identifiers = ["lambda.amazonaws.com"]
+    }
+  }
+}
+
+resource "aws_iam_role" "sqs_fair_queue" {
+  name               = "sqs_fair_queue_${var.sqs_prefix}"
+  path               = "/sqs_fair_queue/"
+  assume_role_policy = data.aws_iam_policy_document.lambda_trust.json
+}
+
+resource "aws_iam_role_policy_attachment" "sqs_fair_queue" {
+  role       = aws_iam_role.sqs_fair_queue.name
+  policy_arn = aws_iam_policy.sqs_fair_queue.arn
+}
+
+resource "aws_iam_role_policy_attachment" "aws_managed_lambda" {
+  role       = aws_iam_role.sqs_fair_queue.name
+  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
+}
+
+

+ 9 - 0
module_sqs_fair_queueing/required_providers.tf

@@ -0,0 +1,9 @@
+terraform {
+  required_version = ">= 1.0"
+  required_providers {
+    aws = {
+      source  = "hashicorp/aws"
+      version = "~> 4.0"
+    }
+  }
+}

+ 61 - 0
module_sqs_fair_queueing/scripts/sqs_fair_queue.py

@@ -0,0 +1,61 @@
+#! /usr/bin/env python3
+#
+# Take from one input queue and distribute amongst others
+import base64
+import boto3
+import hashlib
+import json
+import os
+import random
+
+from jsonpath_ng.ext import parse
+
+
+def get_queue(hashkey, num_queues):
+    """Determines a queue number based on the hashkey"""
+    # print(f"MATCH[0].value: {json.dumps(match[0].value, indent=2)}")
+    s = str(hashkey[0].value).encode("utf-8")  # Stringify
+    # print(f"s: {s}")
+    h = hashlib.sha256(s).hexdigest()  # digest
+    # print(f"h: {h}")
+    queue = int(h, 16) % num_queues  # Convert from base 16 and divide
+    return queue
+
+
+def lambda_handler(event, context):
+    # Get required environment variables
+    # SOURCE_SQS_ARN = os.environ["SOURCE_SQS_ARN"]
+    SQS_PREFIX = os.environ["SQS_PREFIX"]
+    NUM_QUEUES = int(os.environ["NUM_QUEUES"])
+    HASH_JSONPATH = os.environ["HASH_JSONPATH"]
+
+    client = boto3.client("sqs")
+    urls = []
+    for i in range(NUM_QUEUES):
+        urls.append(client.get_queue_url(QueueName=f"{SQS_PREFIX}{i}")["QueueUrl"])
+
+    try:
+        jsonpath_expression = parse(HASH_JSONPATH)
+    except Exception as e:
+        raise "Could not parse jsonpath: '{HASH_JSONPATH}'. Error was: {str(e)}'"
+
+    n = 0
+    for record in event["Records"]:
+        n += 1
+        # Place the record onto the destination queue
+        print(f"DEBUG: Got record: {json.dumps(record, default=str)}")
+        hashkey = jsonpath_expression.find(record)
+        if len(hashkey) > 0:
+            queue = get_queue(hashkey=hashkey, num_queues=NUM_QUEUES)
+            print(f"DEBUG: Queue is {queue}")
+        else:
+            print(
+                f"WARNING: Using random queue for record: {json.dumps(record, default=str)}"
+            )
+            queue = random.randrange(NUM_QUEUES)
+            print(f"DEBUG: Random Queue is {queue}")
+        message = client.send_message(
+            QueueUrl=urls[queue], MessageBody=json.dumps(record, default=str)
+        )
+        print(f"DEBUG: Message submitted: {json.dumps(message, default=str)}")
+    return {"statusCode": 200, "body": f"Successfully fair queued {n} records"}

+ 26 - 0
module_sqs_fair_queueing/sqs.tf

@@ -0,0 +1,26 @@
+resource "aws_sqs_queue" "queue" {
+  count = var.num_queues
+  name  = "${var.sqs_prefix}${count.index}"
+
+  sqs_managed_sse_enabled = true
+
+  # TBD
+  #  policy = <<POLICY
+  #{
+  #  "Version": "2012-10-17",
+  #  "Statement": [
+  #    {
+  #      "Effect": "Allow",
+  #      "Principal": "*",
+  #      "Action": "sqs:SendMessage",
+  #      "Resource": "arn:aws:sqs:*:*:${var.sqs_prefix}${count.index}",
+  #      "Condition": {
+  #        "ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.bucket.arn}" }
+  #      }
+  #    }
+  #  ]
+  #}
+  #POLICY
+}
+
+

+ 21 - 0
module_sqs_fair_queueing/vars.tf

@@ -0,0 +1,21 @@
+variable "source_sqs_arn" {
+  type        = string
+  description = "ARN of the source SQS queue"
+}
+
+variable "sqs_prefix" {
+  type        = string
+  description = "Prefix for the fair-queued queues"
+}
+
+variable "hash_jsonpath" {
+  description = "Hash the value at this JSONPath from the source message to determine the destination queue"
+  type        = string
+  default     = "$"
+}
+
+variable "num_queues" {
+  type        = number
+  description = "How many fair queues to create."
+  default     = 16
+}