大佬教程收集整理的这篇文章主要介绍了通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在使用 ClouDWatch 订阅将一个帐户的 cloudTrail 日志发送到另一个帐户。接收日志的账户有一个 Kinesis 数据流,它接收来自 clouDWatch 订阅的日志,并调用 AWS 提供的标准 lambda 函数来解析日志并将其存储到日志接收者账户的 S3 存储桶中。 写入 s3 存储桶的日志文件采用以下形式:
@H_607_5@{"eventVersion":"1.08","userIDentity":{"type":"AssumedRole","principalID":"AA:i-096379450e69ed082","arn":"arn:aws:sts::34502sdsdsd:assumed-role/RDSAccessRole/i-096379450e69ed082","accountID":"34502sdsdsd","accessKeyID":"ASIAVAVKXAXXXXXXXC","sessionContext":{"sessionIssuer":{"type":"Role","principalID":"AroaVAVKXAKddddD","arn":"arn:aws:iam::3450291sdsdsd:role/RDSAccessRole","accountID":"345029asasas","username":"RDSAccessRole"},"webIDFederationData":{},"attributes":{"mfaAuthenticated":"false","creationDate":"2021-04-27T04:38:52Z"},"ec2RoleDelivery":"2.0"}},"eventTime":"2021-04-27T07:24:20Z","eventsource":"ssm.amazonaws.com","eventname":"LisTinstanceAssociations","awsRegion":"us-east-1","sourceIpaddress":"188.208.227.188","userAgent":"aws-sdk-go/1.25.41 (go1.13.15; linux; amd64) AR_583_11845@azon-ssm-agent/","requestParameters":{"instancEID":"i-096379450e69ed082","maxResults":20},"responseElements":null,"requestID":"a5c63b9d-aaed-4a3c-9b7d-a4f7c6b774ab","eventID":"70de51df-c6df-4a57-8c1e-0ffdeb5ac29d","Readonly":true,"resources":[{"accountID":"34502914asasas","ARN":"arn:aws:ec2:us-east-1:3450291asasas:instance/i-096379450e69ed082"}],"eventType":"AwsAPICall","managementEvent":true,"eventcategory":"Management","recipIEntAccountID":"345029149342"} {"eventVersion":"1.08","principalID":"AroaVAVKXAKPKZ25XXXX:AmazonMWAA-airflow","arn":"arn:aws:sts::3450291asasas:assumed-role/dev-1xdcfd/AmazonMWAA-airflow","accountID":"34502asasas","accessKeyID":"ASIAVAVKXAXXXXXXX","principalID":"AroaVAVKXAKPKZXXXXX","arn":"arn:aws:iam::345029asasas:role/service-role/AmazonMWAA-dlp-dev-1xdcfd","accountID":"3450291asasas","username":"dlp-dev-1xdcfd"},"creationDate":"2021-04-27T07:04:08Z"}},"invokedBy":"airflow.amazonaws.com"},"eventTime":"2021-04-27T07:23:46Z","eventsource":"logs.amazonaws.com","eventname":"CreateLogStream","sourceIpaddress":"airflow.amazonaws.com","userAgent":"airflow.amazonaws.com","errorCode":"resourceAlreadyExistsException","errormessage":"The specifIEd log stream already exists","requestParameters":{"logStreamname":"scheduler.py.log","logGroupname":"dlp-dev-DAGProcessing"},"requestID":"40b48ef9-fc4b-4d1a-8fd1-4f2584aff1e9","eventID":"ef608d43-4765-4a3a-9c92-14ef35104697","Readonly":false,"APIVersion":"20140328","recipIEntAccountID":"3450291asasas"}
这种类型的日志行的问题是 Athena 无法解析这些日志行,我也无法使用 Athena 查询日志。
我尝试修改蓝图 lambda 函数以将日志文件保存为标准 JsON 结果,这将使 Athena 能够轻松解析文件。
例如:
@H_607_5@{'Records': ['{"eventVersion":"1.08","principalID":"AroaVAVKXAKPBRW2S3TAF:i-096379450e69ed082","arn":"arn:aws:sts::345029149342:assumed-role/RightslineRDSAccessRole/i-096379450e69ed082","accountID":"345029149342","accessKeyID":"ASIAVAVKXAKPBL653UOC","principalID":"AroaVAVKXAKPXXXXXXX","arn":"arn:aws:iam::34502asasas:role/RDSAccessRole","resources":[{"accountID":"3450291asasas","ARN":"arn:aws:ec2:us-east-1:34502asasas:instance/i-096379450e69ed082"}],"recipIEntAccountID":"345029asasas"}]}
Blueprint Lambda 函数的修改代码如下所示:
@H_607_5@ import base64 import Json import gzip from io import BytesIO import boto3 def transformLogEvent(log_event): return log_event['message'] + '\n' def processRecords(records): for r in records: data = base64.b64decode(r['data']) Striodata = BytesIO(data) with gzip.Gzipfile(fiLeobj=Striodata,mode='r') as f: data = Json.loads(f.read()) recID = r['recordID'] if Data['messageType'] == 'CONTRol_messaGE': yIEld { 'result': 'Dropped','recordID': recID } elif Data['messageType'] == 'DATA_messaGE': result = {} result["Records"] = {} events = [] for e in data['logEvents']: events.append(e["message"]) result["Records"] = events print(result) if len(result) <= 6000000: yIEld { 'data': result,'result': 'Ok','recordID': recID } else: yIEld { 'result': 'ProcessingFailed','recordID': recID } else: yIEld { 'result': 'ProcessingFailed','recordID': recID } def putRecordsToFirehoseStream(streamname,records,clIEnt,attemptsmade,maxAttempts): FailedRecords = [] codes = [] errMsg = '' # if put_record_batch throws for whatever reason,response['xx'] will error out,adding a check for a valID # response will prevent this response = None try: response = clIEnt.put_record_batch(DeliveryStreamname=streamname,Records=records) except Exception as e: FailedRecords = records errMsg = str(E) # if there are no FailedRecords (put_record_batch succeeded),iterate over the response to gather results if not FailedRecords and response and response['FailedPutCount'] > 0: for IDx,res in enumerate(response['requestResponses']): # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest if 'ErrorCode' not in res or not res['ErrorCode']: conTinue codes.append(res['ErrorCode']) FailedRecords.append(records[IDx]) errMsg = 'IndivIDual error codes: ' + ','.join(codes) if len(FailedRecords) > 0: if attemptsmade + 1 < maxAttempts: print('Some records Failed while calling PutRecordBatch to Firehose stream,retrying. %s' % (errMsg)) putRecordsToFirehoseStream(streamname,FailedRecords,attemptsmade + 1,maxAttempts) else: raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts),errMsg)) def putRecordsToKinesisstream(streamname,maxAttempts): FailedRecords = [] codes = [] errMsg = '' # if put_records throws for whatever reason,adding a check for a valID # response will prevent this response = None try: response = clIEnt.put_records(Streamname=streamname,iterate over the response to gather results if not FailedRecords and response and response['FailedRecordCount'] > 0: for IDx,res in enumerate(response['Records']): # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest if 'ErrorCode' not in res or not res['ErrorCode']: conTinue codes.append(res['ErrorCode']) FailedRecords.append(records[IDx]) errMsg = 'IndivIDual error codes: ' + ','.join(codes) if len(FailedRecords) > 0: if attemptsmade + 1 < maxAttempts: print('Some records Failed while calling PutRecords to Kinesis stream,retrying. %s' % (errMsg)) putRecordsToKinesisstream(streamname,errMsg)) def createReingestionRecord(isSas,originalRecord): if isSas: return {'data': base64.b64decode(originalRecord['data']),'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']} else: return {'data': base64.b64decode(originalRecord['data'])} def getReingestionRecord(isSas,reIngestionRecord): if isSas: return {'Data': reIngestionRecord['data'],'PartitionKey': reIngestionRecord['partitionKey']} else: return {'Data': reIngestionRecord['data']} def lambda_handler(event,context): print(event) isSas = 'sourceKinesisstreamArn' in event streamARN = event['sourceKinesisstreamArn'] if isSas else event['deliveryStreamArn'] region = streamARN.split(':')[3] streamname = streamARN.split('/')[1] records = List(processRecords(event['records'])) projectedSize = 0 dataByRecordID = {rec['recordID']: createReingestionRecord(isSas,reC) for rec in event['records']} putRecordBatches = [] recordsToReingest = [] @R_103_10586@lRecordsToBeReingested = 0 for IDx,rec in enumerate(records): if rec['result'] != 'Ok': conTinue projectedSize += len(rec['data']) + len(rec['recordID']) # 6000000 instead of 6291456 to leave ample headroom for the stuff we dIDn't account for if projectedSize > 6000000: @R_103_10586@lRecordsToBeReingested += 1 recordsToReingest.append( getReingestionRecord(isSas,dataByRecordID[rec['recordID']]) ) records[IDx]['result'] = 'Dropped' del(records[IDx]['data']) # split out the record batches into multiple groups,500 records at max per group if len(recordsToReingest) == 500: putRecordBatches.append(recordsToReingest) recordsToReingest = [] if len(recordsToReingest) > 0: # add the last batch putRecordBatches.append(recordsToReingest) # iterate and call putRecordBatch for each group recordsReingestedSoFar = 0 if len(putRecordBatches) > 0: clIEnt = boto3.clIEnt('kinesis',region_name=region) if isSas else boto3.clIEnt('firehose',region_name=region) for recordBatch in putRecordBatches: if isSas: putRecordsToKinesisstream(streamname,recordBatch,attemptsmade=0,maxAttempts=20) else: putRecordsToFirehoseStream(streamname,maxAttempts=20) recordsReingestedSoFar += len(recordBatch) print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar,@R_103_10586@lRecordsToBeReingested,len(event['records']))) else: print('No records to be reingested') return {"records": records}
我的最终目标是将结果以 JsON 格式存储在 S3 上,以便可以使用 Athena 轻松查询。
发生转换的那一行是:
@H_607_5@elif Data['messageType'] == 'DATA_messaGE':
在这方面的任何帮助将不胜感激。
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
以上是大佬教程为你收集整理的通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输全部内容,希望文章能够帮你解决通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。