Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ import { LIVERAMP_MIN_RECORD_COUNT, LIVERAMP_ENABLE_COMPRESSION_FLAG_NAME } from

const testDestination = createTestIntegration(Destination)

const mockAwsCredentials = {
accessKeyId: 'accessKeyId',
secretAccessKey: 'secretAccessKey',
sessionToken: 'sessionToken'
}

const mockedEvents: SegmentEvent[] = Array.from({ length: 50 }, (_, i) => ({
messageId: `segment-test-message-00000${i + 2}`,
timestamp: '2023-07-26T15:23:39.803Z',
Expand All @@ -22,12 +16,29 @@ const mockedEvents: SegmentEvent[] = Array.from({ length: 50 }, (_, i) => ({
event: 'Audience Entered'
}))

let s3MetadataPayload: unknown = null
jest.mock('@aws-sdk/client-s3', () => ({
S3Client: jest.fn().mockImplementation(() => ({
send: jest.fn().mockImplementation((command) => {
if (command.input?.Key?.includes('meta.json')) {
s3MetadataPayload = JSON.parse(command.input.Body)
}
return Promise.resolve({
$metadata: { httpStatusCode: 200 }
})
})
})),
PutObjectCommand: jest.fn().mockImplementation((input) => ({
constructor: { name: 'PutObjectCommand' },
input
}))
}))

describe('Liveramp Audiences', () => {
let s3MetadataPayload: unknown = null
beforeEach(() => {
jest.clearAllMocks()
jest.resetModules()

s3MetadataPayload = null
jest.spyOn(fs, 'readFileSync').mockReturnValue('token')

nock('https://kubernetes.default.svc')
Expand All @@ -39,42 +50,6 @@ describe('Liveramp Audiences', () => {
}
}
})

nock('https://sts.us-west-2.amazonaws.com/')
.get(
`/?` +
`Action=AssumeRoleWithWebIdentity` +
`&DurationSeconds=3600` +
`&RoleSessionName=integrations-monoservice` +
`&RoleArn=arn:aws:iam::123456789012:role/role-name` +
`&WebIdentityToken=token` +
`&Version=2011-06-15`
)
.reply(200, {
AssumeRoleWithWebIdentityResponse: {
AssumeRoleWithWebIdentityResult: {
Credentials: {
accessKeyId: mockAwsCredentials.accessKeyId,
secretAccessKey: mockAwsCredentials.secretAccessKey,
sessionToken: mockAwsCredentials.sessionToken
}
}
}
})

// capture request body in
nock('https://integrations-outbound-event-store-test-us-west-2.s3.us-west-2.amazonaws.com')
.put(
/\/actions-liveramp-audiences\/destinationConfigId\/actionConfigId\/[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\.csv$/
)
.reply(200)

nock('https://integrations-outbound-event-store-test-us-west-2.s3.us-west-2.amazonaws.com')
.put('/actions-liveramp-audiences/destinationConfigId/actionConfigId/meta.json', (reqbody: any) => {
s3MetadataPayload = reqbody
return true
})
.reply(200)
})
describe('audienceEnteredS3', () => {
it('should send events with valid payload size and events', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ async function processData(input: ProcessDataInput<Payload>, subscriptionMetadat
// AWS FLOW
// -----------
const shouldEnableGzipCompression = input.features && input.features[LIVERAMP_ENABLE_COMPRESSION_FLAG_NAME] === true
return sendEventToAWS(input.request, {
return sendEventToAWS({
audienceComputeId: input.rawData?.[0].context?.personas?.computation_id,
uploadType: 's3',
filename: filename,
destinationInstanceID: subscriptionMetadata?.destinationConfigId,
subscriptionId: subscriptionMetadata?.actionConfigId,
fileContents,
rowCount: input.payloads.length,
gzipCompressFile: shouldEnableGzipCompression,
s3Info: {
s3BucketName: input.payloads[0].s3_aws_bucket_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ async function processData(input: ProcessDataInput<Payload>, subscriptionMetadat
// -----------
const shouldEnableCompression = input.features && input.features[LIVERAMP_ENABLE_COMPRESSION_FLAG_NAME] === true

return sendEventToAWS(input.request, {
return sendEventToAWS({
audienceComputeId: input.rawData?.[0].context?.personas?.computation_id,
uploadType: 'sftp',
filename,
fileContents,
rowCount: input.payloads.length,
destinationInstanceID: subscriptionMetadata?.destinationConfigId,
subscriptionId: subscriptionMetadata?.actionConfigId,
gzipCompressFile: shouldEnableCompression,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { v4 as uuidv4 } from '@lukeed/uuid'
import aws4 from 'aws4'

import { RequestClient, InvalidAuthenticationError } from '@segment/actions-core'
import { getAWSCredentialsFromEKS, AWSCredentials } from '../../lib/AWS/sts'
import { getS3Client } from '../../lib/AWS/s3'
import { PutObjectCommand } from '@aws-sdk/client-s3'

import { ACTION_SLUG, LIVERAMP_SFTP_SERVER, LIVERAMP_SFTP_PORT } from './properties'

Expand All @@ -13,6 +11,7 @@ interface SendToAWSRequest {
uploadType: 's3' | 'sftp'
filename: string
fileContents: Buffer
rowCount: number
gzipCompressFile?: boolean
sftpInfo?: {
sftpUsername?: string
Expand Down Expand Up @@ -49,30 +48,22 @@ interface LRMetaPayload {
}
}

interface UploadToAWSS3Input {
request: RequestClient
bucketName: string
region: string
fileContentType: string
filePath: string
fileContent: Buffer | string
awsCredentials: AWSCredentials
}

const NODE_ENV = process.env['NODE_ENV'] || `stage`
const AWS_REGION = process.env['AWS_REGION'] || `us-west-2`
const S3_BUCKET_NAME = `integrations-outbound-event-store-${NODE_ENV}-${AWS_REGION}`

export const sendEventToAWS = async (request: RequestClient, input: SendToAWSRequest) => {
export const sendEventToAWS = async (input: SendToAWSRequest) => {
// Compute file path and message dedupe id
// Each advertiser and segment can eventually have multiple data drops, we use uuid create unique files
const uuidValue = uuidv4()

const aggreagtedFilePath =
`${input.destinationInstanceID ?? ''}${input.subscriptionId ? '/' + input.subscriptionId : ''}${
input.audienceComputeId ? '/' + input.audienceComputeId : ''
}`.replace(/^\/+|\/+$/g, '') || ''
const userdataFilePath = `/${ACTION_SLUG}/${aggreagtedFilePath}/${uuidValue}.csv`
const metadataFilePath = `/${ACTION_SLUG}/${aggreagtedFilePath}/meta.json`

const userdataFilePath = `${ACTION_SLUG}/${aggreagtedFilePath}/${uuidValue}.csv`
const metadataFilePath = `${ACTION_SLUG}/${aggreagtedFilePath}/meta.json`

// Create Metadata
const metadata: LRMetaPayload = {
Expand Down Expand Up @@ -100,62 +91,34 @@ export const sendEventToAWS = async (request: RequestClient, input: SendToAWSReq
}
}

const awsCredentials = await getAWSCredentialsFromEKS(request)
// Get S3 Client for Outbound Controller
const s3Client = getS3Client('integrationsOutboundController')

// Upload user data to the S3 bucket
await uploadToAWSS3({
request,
bucketName: S3_BUCKET_NAME,
region: AWS_REGION,
fileContentType: 'text/csv',
filePath: userdataFilePath,
fileContent: input.fileContents,
awsCredentials: awsCredentials
})
// Add Row Count to the File Chunk for Observability
const urlEncodedTags = new URLSearchParams({
row_count: `${input.rowCount}`
}).toString()

// Upload metadata to the S3 bucket
return uploadToAWSS3({
request,
bucketName: S3_BUCKET_NAME,
region: AWS_REGION,
fileContentType: 'application/json',
filePath: metadataFilePath,
fileContent: JSON.stringify(metadata),
awsCredentials: awsCredentials
})
}

async function uploadToAWSS3(input: UploadToAWSS3Input) {
// Sign the AWS request
const s3UploadRequest = aws4.sign(
{
host: `${input.bucketName}.s3.${input.region}.amazonaws.com`,
path: input.filePath,
body: input.fileContent,
method: 'PUT',
service: 's3',
region: input.region,
headers: {
'Content-Type': input.fileContentType,
Accept: 'application/json'
}
},
{
accessKeyId: input.awsCredentials.accessKeyId,
secretAccessKey: input.awsCredentials.secretAccessKey,
sessionToken: input.awsCredentials.sessionToken
}
)

// Verify Signed Headers
if (!s3UploadRequest.headers || !s3UploadRequest.method || !s3UploadRequest.host || !s3UploadRequest.path) {
throw new InvalidAuthenticationError('Unable to generate signature header for AWS S3 request.')
}
await Promise.all([
// Upload user data to the S3 bucket
s3Client.send(
new PutObjectCommand({
Bucket: S3_BUCKET_NAME,
Key: userdataFilePath,
Body: input.fileContents,
ContentType: 'text/csv',
Tagging: urlEncodedTags
})
),

// Upload file to S3
return input.request(`https://${input.bucketName}.s3.${input.region}.amazonaws.com${input.filePath}`, {
method: 'PUT',
body: s3UploadRequest.body,
headers: s3UploadRequest.headers as Record<string, string>
})
// Upload metadata to the S3 bucket
s3Client.send(
new PutObjectCommand({
Bucket: S3_BUCKET_NAME,
Key: metadataFilePath,
Body: JSON.stringify(metadata),
ContentType: 'application/json'
})
)
])
}
Loading
Loading