@@ -16,6 +16,7 @@ use std::pin::Pin;
16
16
use std:: process:: Stdio ;
17
17
use std:: sync:: { Arc , Mutex } ;
18
18
use tokio:: io:: { AsyncBufRead , AsyncReadExt } ;
19
+ use tracing:: instrument;
19
20
20
21
pub const OCI_TYPE_LAYER_GZIP : & str = "application/vnd.oci.image.layer.v1.tar+gzip" ;
21
22
pub const OCI_TYPE_LAYER_TAR : & str = "application/vnd.oci.image.layer.v1.tar" ;
@@ -80,6 +81,7 @@ impl std::fmt::Debug for ImageProxy {
80
81
}
81
82
82
83
/// Opaque identifier for an image
84
+ #[ derive( Debug , PartialEq , Eq ) ]
83
85
pub struct OpenedImage ( u32 ) ;
84
86
85
87
#[ allow( unsafe_code) ]
@@ -108,6 +110,7 @@ fn file_from_scm_rights(cmsg: ControlMessageOwned) -> Option<File> {
108
110
109
111
impl ImageProxy {
110
112
/// Create an image proxy that fetches the target image.
113
+ #[ instrument]
111
114
pub async fn new ( ) -> Result < Self > {
112
115
let ( mysock, theirsock) = new_seqpacket_pair ( ) ?;
113
116
let mut c = std:: process:: Command :: new ( "skopeo" ) ;
@@ -117,6 +120,7 @@ impl ImageProxy {
117
120
let mut c = tokio:: process:: Command :: from ( c) ;
118
121
c. kill_on_drop ( true ) ;
119
122
let child = c. spawn ( ) . context ( "Failed to spawn skopeo" ) ?;
123
+ tracing:: debug!( "Spawned skopeo pid={:?}" , child. id( ) ) ;
120
124
let childwait = Box :: pin ( child. wait_with_output ( ) ) ;
121
125
122
126
let sockfd = Arc :: new ( Mutex :: new ( mysock) ) ;
@@ -142,6 +146,7 @@ impl ImageProxy {
142
146
sockfd : Arc < Mutex < File > > ,
143
147
req : Request ,
144
148
) -> Result < ( T , Option < ( File , u32 ) > ) > {
149
+ tracing:: trace!( "sending request {}" , req. method. as_str( ) ) ;
145
150
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
146
151
let r = tokio:: task:: spawn_blocking ( move || {
147
152
let sockfd = sockfd. lock ( ) . unwrap ( ) ;
@@ -187,9 +192,11 @@ impl ImageProxy {
187
192
Ok ( ( reply, fdret) )
188
193
} )
189
194
. await ??;
195
+ tracing:: trace!( "completed request" ) ;
190
196
Ok ( r)
191
197
}
192
198
199
+ #[ instrument( skip( args) ) ]
193
200
async fn impl_request < R : serde:: de:: DeserializeOwned + Send + ' static , T , I > (
194
201
& mut self ,
195
202
method : & str ,
@@ -212,22 +219,28 @@ impl ImageProxy {
212
219
}
213
220
}
214
221
222
+ #[ instrument]
215
223
async fn finish_pipe ( & mut self , pipeid : u32 ) -> Result < ( ) > {
224
+ tracing:: debug!( "closing pipe" ) ;
216
225
let ( r, fd) = self . impl_request ( "FinishPipe" , [ pipeid] ) . await ?;
217
226
if fd. is_some ( ) {
218
227
return Err ( anyhow ! ( "Unexpected fd in finish_pipe reply" ) ) ;
219
228
}
220
229
Ok ( r)
221
230
}
222
231
232
+ #[ instrument]
223
233
pub async fn open_image ( & mut self , imgref : & str ) -> Result < OpenedImage > {
234
+ tracing:: debug!( "opening image" ) ;
224
235
let ( imgid, _) = self
225
236
. impl_request :: < u32 , _ , _ > ( "OpenImage" , [ imgref] )
226
237
. await ?;
227
238
Ok ( OpenedImage ( imgid) )
228
239
}
229
240
241
+ #[ instrument]
230
242
pub async fn close_image ( & mut self , img : & OpenedImage ) -> Result < ( ) > {
243
+ tracing:: debug!( "closing image" ) ;
231
244
let ( r, _) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
232
245
Ok ( r)
233
246
}
@@ -250,6 +263,7 @@ impl ImageProxy {
250
263
/// https://github.com/opencontainers/image-spec/blob/main/descriptor.md
251
264
/// Note that right now the proxy does verification of the digest:
252
265
/// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
266
+ #[ instrument]
253
267
pub async fn get_blob (
254
268
& mut self ,
255
269
img : & OpenedImage ,
@@ -259,6 +273,7 @@ impl ImageProxy {
259
273
impl AsyncBufRead + Send + Unpin ,
260
274
impl Future < Output = Result < ( ) > > + Unpin + ' _ ,
261
275
) > {
276
+ tracing:: debug!( "fetching blob" ) ;
262
277
let args: Vec < serde_json:: Value > =
263
278
vec ! [ img. 0 . into( ) , digest. to_string( ) . into( ) , size. into( ) ] ;
264
279
let ( _bloblen, fd) = self . impl_request :: < i64 , _ , _ > ( "GetBlob" , args) . await ?;
@@ -269,18 +284,21 @@ impl ImageProxy {
269
284
}
270
285
271
286
/// Close the connection and wait for the child process to exit successfully.
287
+ #[ instrument]
272
288
pub async fn finalize ( self ) -> Result < ( ) > {
273
289
let req = Request :: new_bare ( "Shutdown" ) ;
274
290
let sendbuf = serde_json:: to_vec ( & req) ?;
275
291
// SAFETY: Only panics if a worker thread already panic'd
276
292
let sockfd = Arc :: try_unwrap ( self . sockfd ) . unwrap ( ) . into_inner ( ) . unwrap ( ) ;
277
293
nixsocket:: send ( sockfd. as_raw_fd ( ) , & sendbuf, nixsocket:: MsgFlags :: empty ( ) ) ?;
278
294
drop ( sendbuf) ;
295
+ tracing:: debug!( "sent shutdown request" ) ;
279
296
let output = self . childwait . await ?;
280
297
if !output. status . success ( ) {
281
298
let stderr = String :: from_utf8_lossy ( & output. stderr ) ;
282
299
anyhow:: bail!( "proxy failed: {}\n {}" , output. status, stderr)
283
300
}
301
+ tracing:: debug!( "proxy exited successfully" ) ;
284
302
Ok ( ( ) )
285
303
}
286
304
}
0 commit comments