|
@@ -5,11 +5,14 @@ import base64
|
|
|
import boto3
|
|
|
import hashlib
|
|
|
import json
|
|
|
+import logging
|
|
|
import os
|
|
|
import random
|
|
|
|
|
|
from jsonpath_ng.ext import parse
|
|
|
|
|
|
+logger = logging.getLogger()
|
|
|
+
|
|
|
|
|
|
def get_queue(hashkey, num_queues):
|
|
|
"""Determines a queue number based on the hashkey"""
|
|
@@ -23,13 +26,37 @@ def get_queue(hashkey, num_queues):
|
|
|
|
|
|
|
|
|
def lambda_handler(event, context):
|
|
|
+ global logger
|
|
|
+
|
|
|
+ # Grab and process our environment variables
|
|
|
+ SOURCE_SQS_URL = os.environ["SOURCE_SQS_URL"]
|
|
|
+ SQS_PREFIX = os.environ["SQS_PREFIX"]
|
|
|
+ NUM_QUEUES = int(os.environ["NUM_QUEUES"])
|
|
|
+ HASH_JSONPATH = os.environ["HASH_JSONPATH"]
|
|
|
+ DEBUG = os.environ["DEBUG"]
|
|
|
+ BOTODEBUG = os.environ["BOTODEBUG"]
|
|
|
+
|
|
|
+ # Turn of Boto3 Debugging right away
|
|
|
+ for module in [
|
|
|
+ "boto3",
|
|
|
+ "botocore",
|
|
|
+ "nose",
|
|
|
+ "s3transfer",
|
|
|
+ "urllib3",
|
|
|
+ "urllib3.connectionpool",
|
|
|
+ ]:
|
|
|
+ logger = logging.getLogger(module)
|
|
|
+ logger.setLevel(logging.INFO)
|
|
|
+
|
|
|
+ if DEBUG:
|
|
|
+ logger.setLevel(logging.DEBUG)
|
|
|
+ logger.debug("Set logging to debug.")
|
|
|
+ else:
|
|
|
+ logger.setLevel(logging.INFO)
|
|
|
+
|
|
|
if event: # Checking for an event is a best practice, but no idea why
|
|
|
# Get required environment variables
|
|
|
# SOURCE_SQS_ARN = os.environ["SOURCE_SQS_ARN"]
|
|
|
- SOURCE_SQS_URL = os.environ["SOURCE_SQS_URL"]
|
|
|
- SQS_PREFIX = os.environ["SQS_PREFIX"]
|
|
|
- NUM_QUEUES = int(os.environ["NUM_QUEUES"])
|
|
|
- HASH_JSONPATH = os.environ["HASH_JSONPATH"]
|
|
|
|
|
|
client = boto3.client("sqs")
|
|
|
urls = []
|
|
@@ -43,26 +70,29 @@ def lambda_handler(event, context):
|
|
|
|
|
|
n = 0
|
|
|
for record in event["Records"]:
|
|
|
+ logger.debug(f"Got record: {json.dumps(record, default=str)}")
|
|
|
try:
|
|
|
n += 1
|
|
|
+ body = json.loads(record["body"])
|
|
|
+
|
|
|
# Place the record onto the destination queue
|
|
|
- print(f"DEBUG: Got record: {json.dumps(record, default=str)}")
|
|
|
- hashkey = jsonpath_expression.find(record)
|
|
|
+ logger.debug(f"Got body: {json.dumps(body, default=str)}")
|
|
|
+ hashkey = jsonpath_expression.find(body)
|
|
|
if len(hashkey) > 0:
|
|
|
queue = get_queue(hashkey=hashkey, num_queues=NUM_QUEUES)
|
|
|
- print(f"DEBUG: Queue is {queue}")
|
|
|
+ logger.debug(f"DEBUG: Queue is {queue}")
|
|
|
else:
|
|
|
- print(
|
|
|
- f"WARNING: Using random queue for record: {json.dumps(record, default=str)}"
|
|
|
+ logger.warning(
|
|
|
+ f"Using random queue for record: {json.dumps(body, default=str)}"
|
|
|
)
|
|
|
queue = random.randrange(NUM_QUEUES)
|
|
|
- print(f"DEBUG: Random Queue is {queue}")
|
|
|
+ logger.debug(f"Random Queue is {queue}")
|
|
|
message = client.send_message(
|
|
|
- QueueUrl=urls[queue], MessageBody=json.dumps(record, default=str)
|
|
|
+ QueueUrl=urls[queue], MessageBody=json.dumps(body, default=str)
|
|
|
)
|
|
|
- print(f"DEBUG: Message submitted: {json.dumps(message, default=str)}")
|
|
|
+ logger.debug(f"Message submitted: {json.dumps(message, default=str)}")
|
|
|
except Exception as e:
|
|
|
- print(f"ERROR: Could not process message. Error was: {str(e)}")
|
|
|
+ logger.error(f"Could not process message. Error was: {str(e)}")
|
|
|
continue
|
|
|
|
|
|
# Delete the message so it is not reprocessed.
|
|
@@ -70,5 +100,5 @@ def lambda_handler(event, context):
|
|
|
response = client.delete_message(
|
|
|
QueueUrl=SOURCE_SQS_URL, ReceiptHandle=record["receiptHandle"]
|
|
|
)
|
|
|
- print(f"DEBUG: Message deleted: {json.dumps(response, default=str)}")
|
|
|
+ logger.debug(f"Message deleted: {json.dumps(response, default=str)}")
|
|
|
return {"statusCode": 200, "body": f"Successfully fair queued {n} records"}
|