@@ -26,19 +26,17 @@ class ChannelModel extends EventEmitter {
2626 return Promise . fromCallback ( this . connection . close . bind ( this . connection ) ) ;
2727 }
2828
29- createChannel ( ) {
30- const c = new Channel ( this . connection ) ;
31- return c . open ( ) . then ( openOk => { return c ; } ) ;
29+ async createChannel ( ) {
30+ const channel = new Channel ( this . connection ) ;
31+ await channel . open ( ) ;
32+ return channel ;
3233 }
3334
34- createConfirmChannel ( ) {
35- const c = new ConfirmChannel ( this . connection ) ;
36- return c . open ( )
37- . then ( openOk => {
38- return c . rpc ( defs . ConfirmSelect , { nowait : false } ,
39- defs . ConfirmSelectOk )
40- } )
41- . then ( ( ) => { return c ; } ) ;
35+ async createConfirmChannel ( ) {
36+ const channel = new ConfirmChannel ( this . connection ) ;
37+ await channel . open ( ) ;
38+ await channel . rpc ( defs . ConfirmSelect , { nowait : false } , defs . ConfirmSelectOk ) ;
39+ return channel ;
4240 }
4341}
4442
@@ -54,13 +52,12 @@ class Channel extends BaseChannel {
5452 // An RPC that returns a 'proper' promise, which resolves to just the
5553 // response's fields; this is intended to be suitable for implementing
5654 // API procedures.
57- rpc ( method , fields , expect ) {
58- return Promise . fromCallback ( cb => {
55+ async rpc ( method , fields , expect ) {
56+ const f = await Promise . fromCallback ( cb => {
5957 return this . _rpc ( method , fields , expect , cb ) ;
6058 } )
61- . then ( f => {
62- return f . fields ;
63- } ) ;
59+
60+ return f . fields ;
6461 }
6562
6663 // Do the remarkably simple channel open handshake
@@ -161,53 +158,50 @@ class Channel extends BaseChannel {
161158 return this . publish ( '' , queue , content , options ) ;
162159 }
163160
164- consume ( queue , callback , options ) {
161+ async consume ( queue , callback , options ) {
165162 // NB we want the callback to be run synchronously, so that we've
166163 // registered the consumerTag before any messages can arrive.
167164 const fields = Args . consume ( queue , options ) ;
168- return Promise . fromCallback ( cb => {
165+ const ok = await Promise . fromCallback ( cb => {
169166 this . _rpc ( defs . BasicConsume , fields , defs . BasicConsumeOk , cb ) ;
170167 } )
171- . then ( ok => {
172- this . registerConsumer ( ok . fields . consumerTag , callback ) ;
173- return ok . fields ;
174- } ) ;
168+
169+ this . registerConsumer ( ok . fields . consumerTag , callback ) ;
170+ return ok . fields ;
175171 }
176172
177- cancel ( consumerTag ) {
178- return Promise . fromCallback ( cb => {
173+ async cancel ( consumerTag ) {
174+ const ok = await Promise . fromCallback ( cb => {
179175 this . _rpc ( defs . BasicCancel , Args . cancel ( consumerTag ) ,
180176 defs . BasicCancelOk ,
181177 cb ) ;
182178 } )
183- . then ( ok => {
184- this . unregisterConsumer ( consumerTag ) ;
185- return ok . fields ;
186- } ) ;
179+
180+ this . unregisterConsumer ( consumerTag ) ;
181+ return ok . fields ;
187182 }
188183
189- get ( queue , options ) {
184+ async get ( queue , options ) {
190185 const fields = Args . get ( queue , options ) ;
191- return Promise . fromCallback ( cb => {
192- return this . sendOrEnqueue ( defs . BasicGet , fields , cb ) ;
186+ const f = await Promise . fromCallback ( cb => {
187+ this . sendOrEnqueue ( defs . BasicGet , fields , cb ) ;
193188 } )
194- . then ( f => {
195- if ( f . id === defs . BasicGetEmpty ) {
196- return false ;
197- }
198- else if ( f . id === defs . BasicGetOk ) {
199- const fields = f . fields ;
200- return new Promise ( resolve => {
201- this . handleMessage = acceptMessage ( m => {
202- m . fields = fields ;
203- resolve ( m ) ;
204- } ) ;
189+
190+ if ( f . id === defs . BasicGetEmpty ) {
191+ return false ;
192+ }
193+ else if ( f . id === defs . BasicGetOk ) {
194+ const fields = f . fields ;
195+ return new Promise ( resolve => {
196+ this . handleMessage = acceptMessage ( m => {
197+ m . fields = fields ;
198+ resolve ( m ) ;
205199 } ) ;
206- }
207- else {
208- throw new Error ( `Unexpected response to BasicGet: ${ inspect ( f ) } ` ) ;
209- }
210- } ) ;
200+ } ) ;
201+ }
202+ else {
203+ throw new Error ( `Unexpected response to BasicGet: ${ inspect ( f ) } ` ) ;
204+ }
211205 }
212206
213207 ack ( message , allUpTo ) {
@@ -246,18 +240,20 @@ class Channel extends BaseChannel {
246240 Args . recover ( ) ,
247241 defs . BasicRecoverOk ) ;
248242 }
243+
244+ qos ( count , global ) {
245+ return this . rpc ( defs . BasicQos ,
246+ Args . prefetch ( count , global ) ,
247+ defs . BasicQosOk ) ;
248+ }
249249}
250250
251251// There are more options in AMQP than exposed here; RabbitMQ only
252252// implements prefetch based on message count, and only for individual
253253// channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
254254// (without `global` set) as per-consumer (for consumers following),
255255// and prefetch with `global` set as per-channel.
256- Channel . prototype . prefetch = Channel . prototype . qos = function ( count , global ) {
257- return this . rpc ( defs . BasicQos ,
258- Args . prefetch ( count , global ) ,
259- defs . BasicQosOk ) ;
260- } ;
256+ Channel . prototype . prefetch = Channel . prototype . qos
261257
262258// Confirm channel. This is a channel with confirms 'switched on',
263259// meaning sent messages will provoke a responding 'ack' or 'nack'
@@ -280,8 +276,7 @@ class ConfirmChannel extends Channel {
280276 const awaiting = [ ] ;
281277 const unconfirmed = this . unconfirmed ;
282278 unconfirmed . forEach ( ( val , index ) => {
283- if ( val === null ) ; // already confirmed
284- else {
279+ if ( val !== null ) {
285280 const confirmed = new Promise ( ( resolve , reject ) => {
286281 unconfirmed [ index ] = err => {
287282 if ( val ) val ( err ) ;
0 commit comments