4
4
use Upyun \Api \Rest ;
5
5
use Upyun \Api \Form ;
6
6
use GuzzleHttp \Psr7 ;
7
+ use GuzzleHttp \Pool ;
8
+ use GuzzleHttp \Client ;
7
9
8
10
class Uploader
9
11
{
@@ -37,13 +39,15 @@ public function upload($path, $file, $params, $withAsyncProcess)
37
39
->withHeaders ($ params )
38
40
->withFile ($ stream )
39
41
->send ();
42
+ } elseif ($ this ->config ->uploadType === 'BLOCK_PARALLEL ' ) {
43
+ return $ this ->concurrentPointUpload ($ path , $ stream , $ params );
40
44
} else {
41
45
return $ this ->pointUpload ($ path , $ stream , $ params );
42
46
}
43
47
}
44
48
45
49
/**
46
- * 断点续传
50
+ * 串行式断点续传
47
51
* @param $path
48
52
* @param $stream
49
53
* @param $params
@@ -108,7 +112,8 @@ private function pointUpload($path, $stream, $params)
108
112
109
113
private function needUseBlock ($ fileSize )
110
114
{
111
- if ($ this ->config ->uploadType === 'BLOCK ' ) {
115
+ if ($ this ->config ->uploadType === 'BLOCK ' ||
116
+ $ this ->config ->uploadType === 'BLOCK_PARALLEL ' ) {
112
117
return true ;
113
118
} elseif ($ this ->config ->uploadType === 'AUTO ' &&
114
119
$ fileSize >= $ this ->config ->sizeBoundary ) {
@@ -117,4 +122,81 @@ private function needUseBlock($fileSize)
117
122
return false ;
118
123
}
119
124
}
125
+
126
+ /**
127
+ * 并行式断点续传
128
+ * @param $path
129
+ * @param $stream
130
+ * @param $params
131
+ *
132
+ * @return mixed|\Psr\Http\Message\ResponseInterface
133
+ * @throws \Exception
134
+ */
135
+ private function concurrentPointUpload ($ path , $ stream , $ params )
136
+ {
137
+ $ req = new Rest ($ this ->config );
138
+
139
+ $ headers = array ();
140
+ if (is_array ($ params )) {
141
+ foreach ($ params as $ key => $ val ) {
142
+ $ headers ['X-Upyun-Meta- ' . $ key ] = $ val ;
143
+ }
144
+ }
145
+ $ res = $ req ->request ('PUT ' , $ path )
146
+ ->withHeaders (array_merge (array (
147
+ 'X-Upyun-Multi-Disorder ' => 'true ' ,
148
+ 'X-Upyun-Multi-Stage ' => 'initiate ' ,
149
+ 'X-Upyun-Multi-Type ' => Psr7 \mimetype_from_filename ($ path ),
150
+ 'X-Upyun-Multi-Length ' => $ stream ->getSize (),
151
+ ), $ headers ))
152
+ ->send ();
153
+ if ($ res ->getStatusCode () !== 204 ) {
154
+ throw new \Exception ('init request failed when poinit upload! ' );
155
+ }
156
+
157
+ $ init = Util::getHeaderParams ($ res ->getHeaders ());
158
+ $ uuid = $ init ['x-upyun-multi-uuid ' ];
159
+ $ requests = function ($ req , $ path , $ stream , $ uuid ) {
160
+ $ blockSize = 1024 * 1024 ;
161
+ $ total = ceil ($ stream ->getSize () / $ blockSize );
162
+ for ($ i = 0 ; $ i < $ total ; $ i ++) {
163
+ $ fileBlock = $ stream ->read ($ blockSize );
164
+ yield $ req ->request ('PUT ' , $ path )
165
+ ->withHeaders (array (
166
+ 'X-Upyun-Multi-Stage ' => 'upload ' ,
167
+ 'X-Upyun-Multi-Uuid ' => $ uuid ,
168
+ 'X-Upyun-Part-Id ' => $ i
169
+ ))
170
+ ->withFile (Psr7 \stream_for ($ fileBlock ))
171
+ ->toRequest ();
172
+ }
173
+ };
174
+ $ client = new Client ([
175
+ 'timeout ' => $ this ->config ->timeout ,
176
+ ]);
177
+ $ pool = new Pool ($ client , $ requests ($ req , $ path , $ stream , $ uuid ), [
178
+ 'concurrency ' => $ this ->config ->concurrency ,
179
+ 'fulfilled ' => function ($ res ) {
180
+ if ($ res ->getStatusCode () !== 204 ) {
181
+ throw new \Exception ('upload request failed when poinit upload! ' );
182
+ }
183
+ },
184
+ 'rejected ' => function () {
185
+ throw new \Exception ('upload request failed when poinit upload! ' );
186
+ },
187
+ ]);
188
+ $ promise = $ pool ->promise ();
189
+ $ promise ->wait ();
190
+
191
+ $ res = $ req ->request ('PUT ' , $ path )
192
+ ->withHeaders (array (
193
+ 'X-Upyun-Multi-Uuid ' => $ uuid ,
194
+ 'X-Upyun-Multi-Stage ' => 'complete '
195
+ ))
196
+ ->send ();
197
+ if ($ res ->getStatusCode () != 204 && $ res ->getStatusCode () != 201 ) {
198
+ throw new \Exception ('end request failed when poinit upload! ' );
199
+ }
200
+ return $ res ;
201
+ }
120
202
}
0 commit comments