Prechádzať zdrojové kódy

Purge as you go in case of message issues

Fred Damstra (Macbook 2015) 2 rokov pred
rodič
commit
c75f29bbaa

+ 2 - 0
main.tf

@@ -4,6 +4,8 @@ module "sqs_fair_queue" {
   source = "./module_sqs_fair_queueing"
 
   source_sqs_arn = aws_sqs_queue.queue.arn
+  source_sqs_url = aws_sqs_queue.queue.url
+  lambda_timeout = aws_sqs_queue.queue.visibility_timeout_seconds - 5
   sqs_prefix     = "mbox-fair-queueing-test-fq"
   num_queues     = 4
   hash_jsonpath  = "$" # This will evenly distribute all messages

+ 4 - 1
module_sqs_fair_queueing/lambda.tf

@@ -19,6 +19,8 @@ resource "aws_lambda_function" "sqs_fair_queue" {
   function_name = "sqs_fair_queue_${var.sqs_prefix}"
   role          = aws_iam_role.sqs_fair_queue.arn
   handler       = "sqs_fair_queue.lambda_handler"
+  timeout       = var.lambda_timeout
+  # NOTE: If it can't handle the batch in the time alloted, there is a chance for duplicates.
 
   source_code_hash = data.archive_file.sqs_fair_queue.output_base64sha256
 
@@ -26,7 +28,8 @@ resource "aws_lambda_function" "sqs_fair_queue" {
 
   environment {
     variables = {
-      "SOURCE_SQS_ARN" = var.source_sqs_arn
+      #"SOURCE_SQS_ARN" = var.source_sqs_arn Not needed?
+      "SOURCE_SQS_URL" = var.source_sqs_url
       "SQS_PREFIX"     = var.sqs_prefix
       "NUM_QUEUES"     = var.num_queues
       "HASH_JSONPATH"  = var.hash_jsonpath

+ 48 - 35
module_sqs_fair_queueing/scripts/sqs_fair_queue.py

@@ -23,39 +23,52 @@ def get_queue(hashkey, num_queues):
 
 
 def lambda_handler(event, context):
-    # Get required environment variables
-    # SOURCE_SQS_ARN = os.environ["SOURCE_SQS_ARN"]
-    SQS_PREFIX = os.environ["SQS_PREFIX"]
-    NUM_QUEUES = int(os.environ["NUM_QUEUES"])
-    HASH_JSONPATH = os.environ["HASH_JSONPATH"]
-
-    client = boto3.client("sqs")
-    urls = []
-    for i in range(NUM_QUEUES):
-        urls.append(client.get_queue_url(QueueName=f"{SQS_PREFIX}{i}")["QueueUrl"])
-
-    try:
-        jsonpath_expression = parse(HASH_JSONPATH)
-    except Exception as e:
-        raise "Could not parse jsonpath: '{HASH_JSONPATH}'. Error was: {str(e)}'"
-
-    n = 0
-    for record in event["Records"]:
-        n += 1
-        # Place the record onto the destination queue
-        print(f"DEBUG: Got record: {json.dumps(record, default=str)}")
-        hashkey = jsonpath_expression.find(record)
-        if len(hashkey) > 0:
-            queue = get_queue(hashkey=hashkey, num_queues=NUM_QUEUES)
-            print(f"DEBUG: Queue is {queue}")
-        else:
-            print(
-                f"WARNING: Using random queue for record: {json.dumps(record, default=str)}"
+    if event:  # Checking for an event is a best practice, but no idea why
+        # Get required environment variables
+        # SOURCE_SQS_ARN = os.environ["SOURCE_SQS_ARN"]
+        SOURCE_SQS_URL = os.environ["SOURCE_SQS_URL"]
+        SQS_PREFIX = os.environ["SQS_PREFIX"]
+        NUM_QUEUES = int(os.environ["NUM_QUEUES"])
+        HASH_JSONPATH = os.environ["HASH_JSONPATH"]
+
+        client = boto3.client("sqs")
+        urls = []
+        for i in range(NUM_QUEUES):
+            urls.append(client.get_queue_url(QueueName=f"{SQS_PREFIX}{i}")["QueueUrl"])
+
+        try:
+            jsonpath_expression = parse(HASH_JSONPATH)
+        except Exception as e:
+            raise "Could not parse jsonpath: '{HASH_JSONPATH}'. Error was: {str(e)}'"
+
+        n = 0
+        for record in event["Records"]:
+            try:
+                n += 1
+                # Place the record onto the destination queue
+                print(f"DEBUG: Got record: {json.dumps(record, default=str)}")
+                hashkey = jsonpath_expression.find(record)
+                if len(hashkey) > 0:
+                    queue = get_queue(hashkey=hashkey, num_queues=NUM_QUEUES)
+                    print(f"DEBUG: Queue is {queue}")
+                else:
+                    print(
+                        f"WARNING: Using random queue for record: {json.dumps(record, default=str)}"
+                    )
+                    queue = random.randrange(NUM_QUEUES)
+                    print(f"DEBUG: Random Queue is {queue}")
+                message = client.send_message(
+                    QueueUrl=urls[queue], MessageBody=json.dumps(record, default=str)
+                )
+                print(f"DEBUG: Message submitted: {json.dumps(message, default=str)}")
+            except Exception as e:
+                print(f"ERROR: Could not process message. Error was: {str(e)}")
+                continue
+
+            # Delete the message so it is not reprocessed.
+            # Google "partial batch response" for a potentially better way to do this.
+            response = client.delete_message(
+                QueueUrl=SOURCE_SQS_URL, ReceiptHandle=record["receiptHandle"]
             )
-            queue = random.randrange(NUM_QUEUES)
-            print(f"DEBUG: Random Queue is {queue}")
-        message = client.send_message(
-            QueueUrl=urls[queue], MessageBody=json.dumps(record, default=str)
-        )
-        print(f"DEBUG: Message submitted: {json.dumps(message, default=str)}")
-    return {"statusCode": 200, "body": f"Successfully fair queued {n} records"}
+            print(f"DEBUG: Message deleted: {json.dumps(response, default=str)}")
+        return {"statusCode": 200, "body": f"Successfully fair queued {n} records"}

+ 10 - 0
module_sqs_fair_queueing/vars.tf

@@ -3,6 +3,16 @@ variable "source_sqs_arn" {
   description = "ARN of the source SQS queue"
 }
 
+variable "source_sqs_url" {
+  type        = string
+  description = "URL of the source SQS queue"
+}
+
+variable "lambda_timeout" {
+  type        = number
+  description = "Lambda timeout must be less than or equal to the Visiblity timeout of the source SQS queue."
+}
+
 variable "sqs_prefix" {
   type        = string
   description = "Prefix for the fair-queued queues"