程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决通过 Cloudwatch 和 Kinesis 数据流进行跨账户 CloudTrail 日志传输?

开发过程中遇到通过 Cloudwatch 和 Kinesis 数据流进行跨账户 CloudTrail 日志传输的问题如何解决?下面主要结合日常开发的经验,给出你关于通过 Cloudwatch 和 Kinesis 数据流进行跨账户 CloudTrail 日志传输的解决方法建议,希望对你解决通过 Cloudwatch 和 Kinesis 数据流进行跨账户 CloudTrail 日志传输有所启发或帮助;

我正在使用 ClouDWatch 订阅将一个帐户的 cloudTrail 日志发送到另一个帐户。接收日志的账户有一个 Kinesis 数据流,它接收来自 clouDWatch 订阅的日志,并调用 AWS 提供的标准 lambda 函数来解析日志并将其存储到日志接收者账户的 S3 存储桶中。 写入 s3 存储桶的日志文件采用以下形式:

@H_607_5@{"eventVersion":"1.08","userIDentity":{"type":"AssumedRole","principalID":"AA:i-096379450e69ed082","arn":"arn:aws:sts::34502sdsdsd:assumed-role/RDSAccessRole/i-096379450e69ed082","accountID":"34502sdsdsd","accessKeyID":"ASIAVAVKXAXXXXXXXC","sessionContext":{"sessionIssuer":{"type":"Role","principalID":"AroaVAVKXAKddddD","arn":"arn:aws:iam::3450291sdsdsd:role/RDSAccessRole","accountID":"345029asasas","username":"RDSAccessRole"},"webIDFederationData":{},"attributes":{"mfaAuthenticated":"false","creationDate":"2021-04-27T04:38:52Z"},"ec2RoleDelivery":"2.0"}},"eventTime":"2021-04-27T07:24:20Z","eventsource":"ssm.amazonaws.com","eventname":"LisTinstanceAssociations","awsRegion":"us-east-1","sourceIpaddress":"188.208.227.188","userAgent":"aws-sdk-go/1.25.41 (go1.13.15; linux; amd64) AR_583_11845@azon-ssm-agent/","requestParameters":{"instancEID":"i-096379450e69ed082","maxResults":20},"responseElements":null,"requestID":"a5c63b9d-aaed-4a3c-9b7d-a4f7c6b774ab","eventID":"70de51df-c6df-4a57-8c1e-0ffdeb5ac29d","Readonly":true,"resources":[{"accountID":"34502914asasas","ARN":"arn:aws:ec2:us-east-1:3450291asasas:instance/i-096379450e69ed082"}],"eventType":"AwsAPICall","managementEvent":true,"eventcategory":"Management","recipIEntAccountID":"345029149342"}
{"eventVersion":"1.08","principalID":"AroaVAVKXAKPKZ25XXXX:AmazonMWAA-airflow","arn":"arn:aws:sts::3450291asasas:assumed-role/dev-1xdcfd/AmazonMWAA-airflow","accountID":"34502asasas","accessKeyID":"ASIAVAVKXAXXXXXXX","principalID":"AroaVAVKXAKPKZXXXXX","arn":"arn:aws:iam::345029asasas:role/service-role/AmazonMWAA-dlp-dev-1xdcfd","accountID":"3450291asasas","username":"dlp-dev-1xdcfd"},"creationDate":"2021-04-27T07:04:08Z"}},"invokedBy":"airflow.amazonaws.com"},"eventTime":"2021-04-27T07:23:46Z","eventsource":"logs.amazonaws.com","eventname":"CreateLogStream","sourceIpaddress":"airflow.amazonaws.com","userAgent":"airflow.amazonaws.com","errorCode":"resourceAlreadyExistsException","errormessage":"The specifIEd log stream already exists","requestParameters":{"logStreamname":"scheduler.py.log","logGroupname":"dlp-dev-DAGProcessing"},"requestID":"40b48ef9-fc4b-4d1a-8fd1-4f2584aff1e9","eventID":"ef608d43-4765-4a3a-9c92-14ef35104697","Readonly":false,"APIVersion":"20140328","recipIEntAccountID":"3450291asasas"}

这种类型的日志行的问题是 Athena 无法解析这些日志行,我也无法使用 Athena 查询日志。

我尝试修改蓝图 lambda 函数以将日志文件保存为标准 JsON 结果,这将使 Athena 能够轻松解析文件。

例如:

@H_607_5@{'Records': ['{"eventVersion":"1.08","principalID":"AroaVAVKXAKPBRW2S3TAF:i-096379450e69ed082","arn":"arn:aws:sts::345029149342:assumed-role/RightslineRDSAccessRole/i-096379450e69ed082","accountID":"345029149342","accessKeyID":"ASIAVAVKXAKPBL653UOC","principalID":"AroaVAVKXAKPXXXXXXX","arn":"arn:aws:iam::34502asasas:role/RDSAccessRole","resources":[{"accountID":"3450291asasas","ARN":"arn:aws:ec2:us-east-1:34502asasas:instance/i-096379450e69ed082"}],"recipIEntAccountID":"345029asasas"}]}

Blueprint Lambda 函数的修改代码如下所示:

@H_607_5@
    import base64
    import Json
    import gzip
    from io import BytesIO
    import boto3
    
    
    def transformLogEvent(log_event):
       
        return log_event['message'] + '\n'
    
    
    def processRecords(records):
        for r in records:
            data = base64.b64decode(r['data'])
            Striodata = BytesIO(data)
            with gzip.Gzipfile(fiLeobj=Striodata,mode='r') as f:
                data = Json.loads(f.read())
    
            recID = r['recordID']
            
            if Data['messageType'] == 'CONTRol_messaGE':
                yIEld {
                    'result': 'Dropped','recordID': recID
                }
            elif Data['messageType'] == 'DATA_messaGE':
                result = {}
                result["Records"] = {}
                events = []
                for e in data['logEvents']:
                    events.append(e["message"])
                result["Records"] = events
                print(result)
                
                if len(result) <= 6000000:
                    yIEld {
                        'data': result,'result': 'Ok','recordID': recID
                    }
                else:
                    yIEld {
                        'result': 'ProcessingFailed','recordID': recID
                    }
            else:
                yIEld {
                    'result': 'ProcessingFailed','recordID': recID
                }
    
    
    def putRecordsToFirehoseStream(streamname,records,clIEnt,attemptsmade,maxAttempts):
        FailedRecords = []
        codes = []
        errMsg = ''
        # if put_record_batch throws for whatever reason,response['xx'] will error out,adding a check for a valID
        # response will prevent this
        response = None
        try:
            response = clIEnt.put_record_batch(DeliveryStreamname=streamname,Records=records)
        except Exception as e:
            FailedRecords = records
            errMsg = str(E)
    
        # if there are no FailedRecords (put_record_batch succeeded),iterate over the response to gather results
        if not FailedRecords and response and response['FailedPutCount'] > 0:
            for IDx,res in enumerate(response['requestResponses']):
                # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
                if 'ErrorCode' not in res or not res['ErrorCode']:
                    conTinue
    
                codes.append(res['ErrorCode'])
                FailedRecords.append(records[IDx])
    
            errMsg = 'IndivIDual error codes: ' + ','.join(codes)
    
        if len(FailedRecords) > 0:
            if attemptsmade + 1 < maxAttempts:
                print('Some records Failed while calling PutRecordBatch to Firehose stream,retrying. %s' % (errMsg))
                putRecordsToFirehoseStream(streamname,FailedRecords,attemptsmade + 1,maxAttempts)
            else:
                raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts),errMsg))
    
    
    def putRecordsToKinesisstream(streamname,maxAttempts):
        FailedRecords = []
        codes = []
        errMsg = ''
        # if put_records throws for whatever reason,adding a check for a valID
        # response will prevent this
        response = None
        try:
            response = clIEnt.put_records(Streamname=streamname,iterate over the response to gather results
        if not FailedRecords and response and response['FailedRecordCount'] > 0:
            for IDx,res in enumerate(response['Records']):
                # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
                if 'ErrorCode' not in res or not res['ErrorCode']:
                    conTinue
    
                codes.append(res['ErrorCode'])
                FailedRecords.append(records[IDx])
    
            errMsg = 'IndivIDual error codes: ' + ','.join(codes)
    
        if len(FailedRecords) > 0:
            if attemptsmade + 1 < maxAttempts:
                print('Some records Failed while calling PutRecords to Kinesis stream,retrying. %s' % (errMsg))
                putRecordsToKinesisstream(streamname,errMsg))
    
    
    def createReingestionRecord(isSas,originalRecord):
        if isSas:
            return {'data': base64.b64decode(originalRecord['data']),'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
        else:
            return {'data': base64.b64decode(originalRecord['data'])}
    
    
    def getReingestionRecord(isSas,reIngestionRecord):
        if isSas:
            return {'Data': reIngestionRecord['data'],'PartitionKey': reIngestionRecord['partitionKey']}
        else:
            return {'Data': reIngestionRecord['data']}
    
    
    def lambda_handler(event,context):
        print(event)
        isSas = 'sourceKinesisstreamArn' in event
        streamARN = event['sourceKinesisstreamArn'] if isSas else event['deliveryStreamArn']
        region = streamARN.split(':')[3]
        streamname = streamARN.split('/')[1]
        records = List(processRecords(event['records']))
        projectedSize = 0
        dataByRecordID = {rec['recordID']: createReingestionRecord(isSas,reC) for rec in event['records']}
        putRecordBatches = []
        recordsToReingest = []
        @R_103_10586@lRecordsToBeReingested = 0
    
        for IDx,rec in enumerate(records):
            if rec['result'] != 'Ok':
                conTinue
            projectedSize += len(rec['data']) + len(rec['recordID'])
            # 6000000 instead of 6291456 to leave ample headroom for the stuff we dIDn't account for
            if projectedSize > 6000000:
                @R_103_10586@lRecordsToBeReingested += 1
                recordsToReingest.append(
                    getReingestionRecord(isSas,dataByRecordID[rec['recordID']])
                )
                records[IDx]['result'] = 'Dropped'
                del(records[IDx]['data'])
    
            # split out the record batches into multiple groups,500 records at max per group
            if len(recordsToReingest) == 500:
                putRecordBatches.append(recordsToReingest)
                recordsToReingest = []
    
        if len(recordsToReingest) > 0:
            # add the last batch
            putRecordBatches.append(recordsToReingest)
    
        # iterate and call putRecordBatch for each group
        recordsReingestedSoFar = 0
        if len(putRecordBatches) > 0:
            clIEnt = boto3.clIEnt('kinesis',region_name=region) if isSas else boto3.clIEnt('firehose',region_name=region)
            for recordBatch in putRecordBatches:
                if isSas:
                    putRecordsToKinesisstream(streamname,recordBatch,attemptsmade=0,maxAttempts=20)
                else:
                    putRecordsToFirehoseStream(streamname,maxAttempts=20)
                recordsReingestedSoFar += len(recordBatch)
                print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar,@R_103_10586@lRecordsToBeReingested,len(event['records'])))
        else:
            print('No records to be reingested')
    
        return {"records": records}

我的最终目标是将结果以 JsON 格式存储在 S3 上,以便可以使用 Athena 轻松查询。

发生转换的那一行是:

@H_607_5@elif Data['messageType'] == 'DATA_messaGE':

在这方面的任何帮助将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

大佬总结

以上是大佬教程为你收集整理的通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输全部内容,希望文章能够帮你解决通过 Cloudwatch 和 Kinesis 数据流进行跨账户 Cloudtrail 日志传输所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。