Skip to content

Commit 6770d72

Browse files
authored
Merge pull request #126 from cloudgraphdev/feature/CG-1341-support-aws-msk-service
feat(CG-1341): support aws msk cluster
2 parents 915abba + ce5e419 commit 6770d72

File tree

14 files changed

+742
-2
lines changed

14 files changed

+742
-2
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ CloudGraph AWS Provider will ask you what regions you would like to crawl and wi
139139
| lambda | appSync, cognitoUserPool, kms, s3, secretsManager, securityGroup, subnet, vpc, iamRole |
140140
| managedAirflow | cloudwatchLog, iamRole, kms, securityGroups, subnet, s3 |
141141
| managedPrefixList | |
142+
| mskCluster | securityGroup, subnet |
142143
| nacl | vpc |
143144
| natGateway | networkInterface, subnet, vpc |
144145
| networkInterface | ec2, eip, efsMountTarget, natGateway, sageMakerNotebookInstance, subnet, vpc, vpcEndpoint, flowLog, securityGroup |
@@ -155,12 +156,12 @@ CloudGraph AWS Provider will ask you what regions you would like to crawl and wi
155156
| sageMakerProject | |
156157
| s3 | cloudfront, cloudtrail, ecsCluster, iamRole, kinesisFirehose, kms, lambda, managedAirflow, sns, sqs |
157158
| secretsManager | kms, lambda |
158-
| securityGroup | alb, asg, clientVpnEndpoint, codebuild, dmsReplicationInstance, ecsService, lambda, ec2, elasticSearchDomain, elb, rdsCluster, rdsDbInstance, eksCluster, elastiCacheCluster, managedAirflow, sageMakerNotebookInstance, networkInterface, vpcEndpoint |
159+
| securityGroup | alb, asg, clientVpnEndpoint, codebuild, dmsReplicationInstance, ecsService, lambda, ec2, elasticSearchDomain, elb, rdsCluster, rdsDbInstance, eksCluster, elastiCacheCluster, managedAirflow, sageMakerNotebookInstance, networkInterface, vpcEndpoint, mskCluster |
159160
| securityHub | |
160161
| ses | |
161162
| sns | kms, cloudtrail, cloudwatch, s3 |
162163
| sqs | elasticBeanstalkEnv, s3 |
163-
| subnet | alb, asg, codebuild, dmsReplicationInstance, ec2, ecsService, efsMountTarget, elastiCacheCluster, elasticSearchDomain, elb, lambda, managedAirflow, natGateway, networkInterface, rdsCluster, sageMakerNotebookInstance, routeTable, vpc, vpcEndpoint, eksCluster, emrCluster, flowLog |
164+
| subnet | alb, asg, codebuild, dmsReplicationInstance, ec2, ecsService, efsMountTarget, elastiCacheCluster, elasticSearchDomain, elb, lambda, managedAirflow, natGateway, networkInterface, rdsCluster, sageMakerNotebookInstance, routeTable, vpc, vpcEndpoint, eksCluster, emrCluster, flowLog, mskCluster |
164165
| systemsManagerInstance | ec2, iamRole |
165166
| systemsManagerDocument | |
166167
| transitGateway | transitGatewayAttachment, transitGatewayRouteTable, vpnConnection |

src/enums/resources.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export default {
1111
kmsKey: 'aws_kms_key',
1212
region: 'aws_region', // Not a real TF resource, used to organize all regional resources
1313
account: 'aws_account', // Not a real TF resource, used to organize all regional resources
14+
mskCluster: 'aws_msk_cluster',
1415
iamPasswordPolicy: 'aws_iam_password_policy',
1516
iamSamlProvider: 'aws_iam_saml_provider',
1617
iamOpenIdConnectProvider: 'aws_iam_openidconnect_provider',

src/enums/schemasMap.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export default {
7575
[services.lambda]: 'awsLambda',
7676
[services.managedAirflow]: 'awsManagedAirflow',
7777
[services.managedPrefixList]: 'awsManagedPrefixList',
78+
[services.mskCluster]: 'awsMskCluster',
7879
[services.nacl]: 'awsNetworkAcl',
7980
[services.nat]: 'awsNatGateway',
8081
[services.networkInterface]: 'awsNetworkInterface',

src/enums/serviceAliases.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export default {
5353
[services.lambda]: 'lambdaFunctions',
5454
[services.managedAirflow]: 'managedAirflows',
5555
[services.managedPrefixList]: 'managedPrefixLists',
56+
[services.mskCluster]: 'mskCluster',
5657
[services.nat]: 'natGateway',
5758
[services.networkInterface]: 'networkInterfaces',
5859
[services.organization]: 'organizations',

src/enums/serviceMap.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ import VpcEndpoint from '../services/vpcEndpoint'
102102
import APIGatewayDomainName from '../services/apiGatewayDomainName'
103103
import APIGatewayHttpApi from '../services/apiGatewayHttpApi'
104104
import ManagedPrefixList from '../services/managedPrefixList'
105+
import MskCluster from '../services/msk'
105106
import TransitGatewayRouteTable from '../services/transitGatewayRouteTable'
106107
import VpcPeeringConnection from '../services/vpcPeeringConnection'
107108

@@ -164,6 +165,7 @@ export default {
164165
[services.lambda]: Lambda,
165166
[services.managedAirflow]: ManagedAirflow,
166167
[services.managedPrefixList]: ManagedPrefixList,
168+
[services.mskCluster]: MskCluster,
167169
[services.nacl]: NetworkAcl,
168170
[services.nat]: NATGateway,
169171
[services.networkInterface]: NetworkInterface,

src/enums/services.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export default {
6969
lambda: 'lambda',
7070
managedAirflow: 'managedAirflow',
7171
managedPrefixList: 'managedPrefixList',
72+
mskCluster: 'mskCluster',
7273
nacl: 'nacl',
7374
nat: 'nat',
7475
networkInterface: 'networkInterface',

src/properties/logger.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,4 +702,9 @@ export default {
702702
securityHubNotFound: (region: string): string => `Security Hub not found/disabled for region: ${region}`,
703703
fetchedSecurityHub: (region: string): string => `Security Hub found/enabled for region: ${region}`,
704704
fetchingSecurityHub: 'Fetching Security Hub data for this AWS account via the AWS SDK...',
705+
/**
706+
* Msk
707+
*/
708+
fetchedMskClusters: (num: number): string =>
709+
`Fetched ${num} Msk clusters`,
705710
}

src/services/msk/connections.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { ServiceConnection } from '@cloudgraph/sdk'
2+
3+
import services from '../../enums/services'
4+
import { Cluster } from 'aws-sdk/clients/kafka'
5+
6+
/**
7+
* Msk
8+
*/
9+
10+
export default ({
11+
service: mskCluster,
12+
data,
13+
region,
14+
account,
15+
}: {
16+
account: string
17+
data: { name: string; data: { [property: string]: any[] } }[]
18+
service: Cluster & {
19+
region: string
20+
}
21+
region: string
22+
}): { [key: string]: ServiceConnection[] } => {
23+
const connections: ServiceConnection[] = []
24+
25+
const {
26+
ClusterArn: id,
27+
Serverless: serverless,
28+
} = mskCluster || {}
29+
30+
/**
31+
* Add subnets
32+
*/
33+
serverless?.VpcConfigs
34+
?.filter(vc => vc.SubnetIds)
35+
?.forEach(vc => {
36+
connections.push(
37+
...vc?.SubnetIds?.map(subnetId => ({
38+
id: subnetId,
39+
resourceType: services.subnet,
40+
relation: 'child',
41+
field: 'subnet',
42+
}))
43+
)
44+
})
45+
46+
/**
47+
* Add Security Groups
48+
*/
49+
serverless?.VpcConfigs
50+
?.filter(vc => vc.SecurityGroupIds)
51+
?.forEach(vc => {
52+
connections.push(
53+
...vc?.SecurityGroupIds?.map(sgId => ({
54+
id: sgId,
55+
resourceType: services.sg,
56+
relation: 'child',
57+
field: 'securityGroups',
58+
}))
59+
)
60+
})
61+
62+
const mskResult = {
63+
[id]: connections,
64+
}
65+
return mskResult
66+
}

src/services/msk/data.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import CloudGraph from '@cloudgraph/sdk'
2+
import Kafka, { Cluster, ListClustersV2Request, ListClustersV2Response } from 'aws-sdk/clients/kafka'
3+
import { AWSError } from 'aws-sdk/lib/error'
4+
import { Config } from 'aws-sdk/lib/config'
5+
import isEmpty from 'lodash/isEmpty'
6+
import groupBy from 'lodash/groupBy'
7+
import awsLoggerText from '../../properties/logger'
8+
import { initTestEndpoint, setAwsRetryOptions } from '../../utils'
9+
import AwsErrorLog from '../../utils/errorLog'
10+
import { API_GATEWAY_CUSTOM_DELAY } from '../../config/constants'
11+
import { TagMap } from '../../types'
12+
13+
const lt = { ...awsLoggerText }
14+
const { logger } = CloudGraph
15+
const MAX_DOMAIN_NAMES = 500
16+
const serviceName = 'Msk'
17+
const errorLog = new AwsErrorLog(serviceName)
18+
const endpoint = initTestEndpoint(serviceName)
19+
const customRetrySettings = setAwsRetryOptions({
20+
baseDelay: API_GATEWAY_CUSTOM_DELAY,
21+
})
22+
23+
export const getMskClustersForRegion = async (
24+
kafka: Kafka
25+
): Promise<Cluster[]> =>
26+
new Promise(async resolve => {
27+
const mskClusterList: Cluster[] = []
28+
const listClusterV2Opts: ListClustersV2Request = {}
29+
const listAllMskClusters = (token?: string): void => {
30+
listClusterV2Opts.MaxResults = MAX_DOMAIN_NAMES
31+
if (token) {
32+
listClusterV2Opts.NextToken = token
33+
}
34+
try {
35+
kafka.listClustersV2(
36+
listClusterV2Opts,
37+
(err: AWSError, data: ListClustersV2Response) => {
38+
if (err) {
39+
errorLog.generateAwsErrorLog({
40+
functionName: 'kafka:listClustersV2',
41+
err,
42+
})
43+
}
44+
45+
if (isEmpty(data)) {
46+
return resolve([])
47+
}
48+
49+
const { NextToken: nextToken, ClusterInfoList: clusters = [] } = data || {}
50+
51+
if (isEmpty(clusters)) {
52+
return resolve([])
53+
}
54+
55+
logger.debug(lt.fetchedMskClusters(clusters.length))
56+
57+
mskClusterList.push(...clusters)
58+
59+
if (nextToken) {
60+
listAllMskClusters(nextToken)
61+
} else {
62+
resolve(mskClusterList)
63+
}
64+
}
65+
)
66+
} catch (error) {
67+
resolve([])
68+
}
69+
}
70+
listAllMskClusters()
71+
})
72+
73+
export interface RawAwsMskCluster extends Omit<Cluster, 'Tags'> {
74+
region: string
75+
Tags: TagMap
76+
account
77+
}
78+
79+
export default async ({
80+
regions,
81+
config,
82+
account,
83+
}: {
84+
account: string
85+
regions: string
86+
config: Config
87+
}): Promise<{
88+
[region: string]: RawAwsMskCluster[]
89+
}> =>
90+
new Promise(async resolve => {
91+
const mskClustersResult: RawAwsMskCluster[] = []
92+
93+
const regionPromises = regions.split(',').map(region => {
94+
const kafka = new Kafka({
95+
...config,
96+
region,
97+
endpoint,
98+
...customRetrySettings,
99+
})
100+
101+
return new Promise<void>(async resolveMskClusterData => {
102+
// Get Msk Cluster Data
103+
const mskClusters = await getMskClustersForRegion(kafka)
104+
105+
if (!isEmpty(mskClusters)) {
106+
for (const cluster of mskClusters) {
107+
mskClustersResult.push({
108+
...cluster,
109+
region,
110+
Tags: cluster.Tags,
111+
account,
112+
})
113+
}
114+
}
115+
116+
resolveMskClusterData()
117+
})
118+
})
119+
120+
await Promise.all(regionPromises)
121+
errorLog.reset()
122+
123+
resolve(groupBy(mskClustersResult, 'region'))
124+
})

0 commit comments

Comments
 (0)