Skip to content

Commit a8c9aae

Browse files
authored
AWS S3: Use mulitpart post to upload large files from Streams. (#628)
* AWS S3: Do not use MemoryStream to upload objects. * keep uploading simple if file is lower than 5MB * Increase to 10MB min upload limit
1 parent aafa7bd commit a8c9aae

File tree

3 files changed

+241
-6
lines changed

3 files changed

+241
-6
lines changed

dotnet/src/dotnetcore/Providers/Storage/GXAmazonS3/GXAmazonS3.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
</PropertyGroup>
1010
<ItemGroup>
1111
<Compile Include="..\..\..\..\dotnetframework\Providers\Storage\GXAmazonS3\ExternalProviderS3.cs" Link="ExternalProviderS3.cs" />
12+
<Compile Include="..\..\..\..\dotnetframework\Providers\Storage\GXAmazonS3\S3UploadStream.cs" Link="S3UploadStream.cs" />
1213
</ItemGroup>
1314

1415
<ItemGroup>

dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22
using Amazon.S3;
33
using Amazon.S3.IO;
44
using Amazon.S3.Model;
5-
using GeneXus.Encryption;
65
using GeneXus.Services;
76
using GeneXus.Utils;
8-
using log4net;
97
using System;
108
using System.Collections.Generic;
119
using System.IO;
@@ -362,18 +360,44 @@ private S3CannedACL GetCannedACL(GxFileType acl)
362360
}
363361
}
364362

363+
const long MIN_MULTIPART_POST = 10L * 1024 * 1024;
364+
365365
public string Upload(string fileName, Stream stream, GxFileType destFileType)
366366
{
367-
MemoryStream ms = new MemoryStream();
368-
stream.CopyTo(ms);//can determine PutObjectRequest.Headers.ContentLength. Avoid error Could not determine content length
367+
bool doSimpleUpload = stream.CanSeek && stream.Length <= MIN_MULTIPART_POST;
368+
if (doSimpleUpload)
369+
{
370+
return UploadSimple(fileName, stream, destFileType);
371+
}
372+
else
373+
{
374+
return UploadMultipart(fileName, stream, destFileType);
375+
}
376+
}
377+
378+
private string UploadMultipart(string fileName, Stream stream, GxFileType destFileType)
379+
{
380+
string contentType = null;
381+
TryGetContentType(fileName, out contentType);
382+
383+
using (S3UploadStream s = new S3UploadStream(Client, Bucket, fileName, GetCannedACL(destFileType), contentType))
384+
{
385+
stream.CopyTo(s);
386+
}
387+
return Get(fileName, destFileType);
388+
}
389+
390+
private string UploadSimple(string fileName, Stream stream, GxFileType destFileType)
391+
{
369392
PutObjectRequest objectRequest = new PutObjectRequest()
370393
{
371394
BucketName = Bucket,
372395
Key = fileName,
373-
InputStream = ms,
396+
InputStream = stream,
374397
CannedACL = GetCannedACL(destFileType)
375398
};
376-
if (TryGetContentType(fileName, out string mimeType)) {
399+
if (TryGetContentType(fileName, out string mimeType))
400+
{
377401
objectRequest.ContentType = mimeType;
378402
}
379403
PutObjectResponse result = PutObject(objectRequest);
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
using Amazon.S3;
2+
using Amazon.S3.Model;
3+
using System;
4+
using System.Collections.Concurrent;
5+
using System.Collections.Generic;
6+
using System.IO;
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
10+
namespace GeneXus.Storage.GXAmazonS3
11+
{
12+
public class S3UploadStream : Stream
13+
{
14+
/* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't
15+
* safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT
16+
* is ~50TB, which is too big for S3. */
17+
const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater
18+
const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT
19+
const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total
20+
const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH;
21+
22+
internal class Metadata
23+
{
24+
public string BucketName { get; set; }
25+
public string Key { get; set; }
26+
public long PartLength { get; set; } = DEFAULT_PART_LENGTH;
27+
28+
public int PartCount { get; set; } = 0;
29+
public string UploadId { get; set; }
30+
public MemoryStream CurrentStream { get; set; }
31+
public S3CannedACL Acl { get; set; }
32+
public string ContentType { get; set; }
33+
34+
public long Position { get; set; } = 0;
35+
public long Length { get; set; } = 0;
36+
37+
public List<Task> Tasks = new List<Task>();
38+
public ConcurrentDictionary<int, string> PartETags = new ConcurrentDictionary<int, string>();
39+
}
40+
41+
Metadata _metadata = new Metadata();
42+
IAmazonS3 _s3 = null;
43+
44+
public S3UploadStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH)
45+
: this(s3, new Uri(s3uri), partLength)
46+
{
47+
}
48+
49+
public S3UploadStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH)
50+
: this(s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength)
51+
{
52+
}
53+
public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH)
54+
: this(s3, bucket, key, null, null, partLength)
55+
{
56+
57+
}
58+
public S3UploadStream(IAmazonS3 s3, string bucket, string key, S3CannedACL acl, string cType = null, long partLength = DEFAULT_PART_LENGTH)
59+
{
60+
_s3 = s3;
61+
_metadata.BucketName = bucket;
62+
_metadata.Key = key;
63+
_metadata.PartLength = partLength;
64+
_metadata.Acl = acl;
65+
_metadata.ContentType = cType;
66+
}
67+
68+
protected override void Dispose(bool disposing)
69+
{
70+
if (disposing)
71+
{
72+
if (_metadata != null)
73+
{
74+
Flush(true);
75+
CompleteUpload();
76+
}
77+
}
78+
_metadata = null;
79+
base.Dispose(disposing);
80+
}
81+
82+
public override bool CanRead => false;
83+
public override bool CanSeek => false;
84+
public override bool CanWrite => true;
85+
public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position);
86+
87+
public override long Position
88+
{
89+
get => _metadata.Position;
90+
set => throw new NotImplementedException();
91+
}
92+
93+
public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
94+
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
95+
96+
public override void SetLength(long value)
97+
{
98+
_metadata.Length = Math.Max(_metadata.Length, value);
99+
_metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT));
100+
}
101+
102+
private void StartNewPart()
103+
{
104+
if (_metadata.CurrentStream != null)
105+
{
106+
Flush(false);
107+
}
108+
_metadata.CurrentStream = new MemoryStream();
109+
_metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH));
110+
}
111+
112+
public override void Flush()
113+
{
114+
Flush(false);
115+
}
116+
117+
private void Flush(bool disposing)
118+
{
119+
if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) &&
120+
!disposing)
121+
return;
122+
123+
if (_metadata.UploadId == null)
124+
{
125+
126+
InitiateMultipartUploadRequest uploadRequest = new InitiateMultipartUploadRequest()
127+
{
128+
BucketName = _metadata.BucketName,
129+
Key = _metadata.Key
130+
};
131+
132+
if (_metadata.Acl != null)
133+
{
134+
uploadRequest.CannedACL = _metadata.Acl;
135+
}
136+
137+
if (!string.IsNullOrEmpty(_metadata.ContentType))
138+
{
139+
uploadRequest.ContentType = _metadata.ContentType;
140+
}
141+
_metadata.UploadId = _s3.InitiateMultipartUploadAsync(uploadRequest).GetAwaiter().GetResult().UploadId;
142+
}
143+
144+
if (_metadata.CurrentStream != null)
145+
{
146+
int i = ++_metadata.PartCount;
147+
148+
_metadata.CurrentStream.Seek(0, SeekOrigin.Begin);
149+
var request = new UploadPartRequest()
150+
{
151+
BucketName = _metadata.BucketName,
152+
Key = _metadata.Key,
153+
UploadId = _metadata.UploadId,
154+
PartNumber = i,
155+
IsLastPart = disposing,
156+
InputStream = _metadata.CurrentStream
157+
};
158+
_metadata.CurrentStream = null;
159+
160+
var upload = Task.Run(() =>
161+
{
162+
UploadPartResponse response = _s3.UploadPartAsync(request).GetAwaiter().GetResult();
163+
_metadata.PartETags.AddOrUpdate(i, response.ETag,
164+
(n, s) => response.ETag);
165+
request.InputStream.Dispose();
166+
});
167+
_metadata.Tasks.Add(upload);
168+
}
169+
}
170+
171+
private void CompleteUpload()
172+
{
173+
Task.WaitAll(_metadata.Tasks.ToArray());
174+
175+
if (Length > 0)
176+
{
177+
_s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest()
178+
{
179+
BucketName = _metadata.BucketName,
180+
Key = _metadata.Key,
181+
PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(),
182+
UploadId = _metadata.UploadId
183+
}).GetAwaiter().GetResult();
184+
}
185+
}
186+
187+
public override void Write(byte[] buffer, int offset, int count)
188+
{
189+
if (count == 0) return;
190+
191+
// write as much of the buffer as will fit to the current part, and if needed
192+
// allocate a new part and continue writing to it (and so on).
193+
int o = offset;
194+
int c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to
195+
do
196+
{
197+
if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength)
198+
StartNewPart();
199+
200+
long remaining = _metadata.PartLength - _metadata.CurrentStream.Length;
201+
int w = Math.Min(c, (int)remaining);
202+
_metadata.CurrentStream.Write(buffer, o, w);
203+
204+
_metadata.Position += w;
205+
c -= w;
206+
o += w;
207+
} while (c > 0);
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)