kinesis-firehose-cloudwatch-logs-processor.js 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. For processing data sent to Firehose by Cloudwatch Logs subscription filters.
  3. Cloudwatch Logs sends to Firehose records that look like this:
  4. {
  5. "messageType": "DATA_MESSAGE",
  6. "owner": "123456789012",
  7. "logGroup": "log_group_name",
  8. "logStream": "log_stream_name",
  9. "subscriptionFilters": [
  10. "subscription_filter_name"
  11. ],
  12. "logEvents": [
  13. {
  14. "id": "01234567890123456789012345678901234567890123456789012345",
  15. "timestamp": 1510109208016,
  16. "message": "log message 1"
  17. },
  18. {
  19. "id": "01234567890123456789012345678901234567890123456789012345",
  20. "timestamp": 1510109208017,
  21. "message": "log message 2"
  22. }
  23. ...
  24. ]
  25. }
  26. The data is additionally compressed with GZIP.
  27. The code below will:
  28. 1) Gunzip the data
  29. 2) Parse the json
  30. 3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
  31. processing error output. Such records do not contain any log events. You can modify the code to set the result to
  32. Dropped instead to get rid of these records completely.
  33. 4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
  34. each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
  35. transformations on the log events.
  36. 5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
  37. this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
  38. method.
  39. 6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
  40. */
  41. const zlib = require('zlib');
  42. const AWS = require('aws-sdk');
  43. /**
  44. * logEvent has this format:
  45. *
  46. * {
  47. * "id": "01234567890123456789012345678901234567890123456789012345",
  48. * "timestamp": 1510109208016,
  49. * "message": "log message 1"
  50. * }
  51. *
  52. * The default implementation below just extracts the message and appends a newline to it.
  53. *
  54. * The result must be returned in a Promise.
  55. */
  56. function transformLogEvent(logEvent) {
  57. if (logEvent.message.includes('NODATA')) {
  58. return Promise.resolve(``);
  59. }
  60. return Promise.resolve(`${logEvent.message}\n`);
  61. }
  62. function putRecordsToFirehoseStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
  63. client.putRecordBatch({
  64. DeliveryStreamName: streamName,
  65. Records: records,
  66. }, (err, data) => {
  67. const codes = [];
  68. let failed = [];
  69. let errMsg = err;
  70. if (err) {
  71. failed = records;
  72. } else {
  73. for (let i = 0; i < data.RequestResponses.length; i++) {
  74. const code = data.RequestResponses[i].ErrorCode;
  75. if (code) {
  76. codes.push(code);
  77. failed.push(records[i]);
  78. }
  79. }
  80. errMsg = `Individual error codes: ${codes}`;
  81. }
  82. if (failed.length > 0) {
  83. if (attemptsMade + 1 < maxAttempts) {
  84. console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
  85. putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
  86. } else {
  87. reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
  88. }
  89. } else {
  90. resolve('');
  91. }
  92. });
  93. }
  94. function putRecordsToKinesisStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
  95. client.putRecords({
  96. StreamName: streamName,
  97. Records: records,
  98. }, (err, data) => {
  99. const codes = [];
  100. let failed = [];
  101. let errMsg = err;
  102. if (err) {
  103. failed = records;
  104. } else {
  105. for (let i = 0; i < data.Records.length; i++) {
  106. const code = data.Records[i].ErrorCode;
  107. if (code) {
  108. codes.push(code);
  109. failed.push(records[i]);
  110. }
  111. }
  112. errMsg = `Individual error codes: ${codes}`;
  113. }
  114. if (failed.length > 0) {
  115. if (attemptsMade + 1 < maxAttempts) {
  116. console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
  117. putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
  118. } else {
  119. reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
  120. }
  121. } else {
  122. resolve('');
  123. }
  124. });
  125. }
  126. function createReingestionRecord(isSas, originalRecord) {
  127. if (isSas) {
  128. return {
  129. Data: Buffer.from(originalRecord.data, 'base64'),
  130. PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
  131. };
  132. } else {
  133. return {
  134. Data: Buffer.from(originalRecord.data, 'base64'),
  135. };
  136. }
  137. }
  138. function getReingestionRecord(isSas, reIngestionRecord) {
  139. if (isSas) {
  140. return {
  141. Data: reIngestionRecord.Data,
  142. PartitionKey: reIngestionRecord.PartitionKey,
  143. };
  144. } else {
  145. return {
  146. Data: reIngestionRecord.Data,
  147. };
  148. }
  149. }
  150. exports.handler = (event, context, callback) => {
  151. Promise.all(event.records.map(r => {
  152. const buffer = Buffer.from(r.data, 'base64');
  153. let decompressed;
  154. try {
  155. decompressed = zlib.gunzipSync(buffer);
  156. } catch (e) {
  157. return Promise.resolve({
  158. recordId: r.recordId,
  159. result: 'ProcessingFailed',
  160. });
  161. }
  162. const data = JSON.parse(decompressed);
  163. // CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
  164. // They do not contain actual data.
  165. if (data.messageType === 'CONTROL_MESSAGE') {
  166. return Promise.resolve({
  167. recordId: r.recordId,
  168. result: 'Dropped',
  169. });
  170. } else if (data.messageType === 'DATA_MESSAGE') {
  171. const promises = data.logEvents.map(transformLogEvent);
  172. return Promise.all(promises)
  173. .then(transformed => {
  174. const payload = transformed.reduce((a, v) => a + v, '');
  175. const encoded = Buffer.from(payload).toString('base64');
  176. return {
  177. recordId: r.recordId,
  178. result: 'Ok',
  179. data: encoded,
  180. };
  181. });
  182. } else {
  183. return Promise.resolve({
  184. recordId: r.recordId,
  185. result: 'ProcessingFailed',
  186. });
  187. }
  188. })).then(recs => {
  189. const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn');
  190. const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
  191. const region = streamARN.split(':')[3];
  192. const streamName = streamARN.split('/')[1];
  193. const result = { records: recs };
  194. let recordsToReingest = [];
  195. const putRecordBatches = [];
  196. let totalRecordsToBeReingested = 0;
  197. const inputDataByRecId = {};
  198. event.records.forEach(r => inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r));
  199. let projectedSize = recs.filter(rec => rec.result === 'Ok')
  200. .map(r => r.recordId.length + r.data.length)
  201. .reduce((a, b) => a + b, 0);
  202. // 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
  203. for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) {
  204. const rec = result.records[idx];
  205. if (rec.result === 'Ok') {
  206. totalRecordsToBeReingested++;
  207. recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId]));
  208. projectedSize -= rec.data.length;
  209. delete rec.data;
  210. result.records[idx].result = 'Dropped';
  211. // split out the record batches into multiple groups, 500 records at max per group
  212. if (recordsToReingest.length === 500) {
  213. putRecordBatches.push(recordsToReingest);
  214. recordsToReingest = [];
  215. }
  216. }
  217. }
  218. if (recordsToReingest.length > 0) {
  219. // add the last batch
  220. putRecordBatches.push(recordsToReingest);
  221. }
  222. if (putRecordBatches.length > 0) {
  223. new Promise((resolve, reject) => {
  224. let recordsReingestedSoFar = 0;
  225. for (let idx = 0; idx < putRecordBatches.length; idx++) {
  226. const recordBatch = putRecordBatches[idx];
  227. if (isSas) {
  228. const client = new AWS.Kinesis({ region: region });
  229. putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
  230. } else {
  231. const client = new AWS.Firehose({ region: region });
  232. putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
  233. }
  234. recordsReingestedSoFar += recordBatch.length;
  235. console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName);
  236. }
  237. }).then(
  238. () => {
  239. console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName);
  240. callback(null, result);
  241. },
  242. failed => {
  243. console.log('Failed to reingest records. %s', failed);
  244. callback(failed, null);
  245. });
  246. } else {
  247. console.log('No records needed to be reingested.');
  248. callback(null, result);
  249. }
  250. }).catch(ex => {
  251. console.log('Error: ', ex);
  252. callback(ex, null);
  253. });
  254. };