@@ -3,10 +3,13 @@ import type {
3
3
MessageQueueEnqueueOptions ,
4
4
MessageQueueListenOptions ,
5
5
} from "@fedify/fedify" ;
6
+ import { getLogger } from "@logtape/logtape" ;
6
7
import type { JSONValue , Parameter , Sql } from "postgres" ;
7
8
import postgres from "postgres" ;
8
9
import { driverSerializesJson } from "./utils.ts" ;
9
10
11
+ const logger = getLogger ( [ "fedify" , "postgres" , "mq" ] ) ;
12
+
10
13
/**
11
14
* Options for the PostgreSQL message queue.
12
15
*/
@@ -85,14 +88,27 @@ export class PostgresMessageQueue implements MessageQueue {
85
88
) : Promise < void > {
86
89
await this . initialize ( ) ;
87
90
const delay = options ?. delay ?? Temporal . Duration . from ( { seconds : 0 } ) ;
91
+ if ( options ?. delay ) {
92
+ logger . debug ( "Enqueuing a message with a delay of {delay}..." , {
93
+ delay,
94
+ message,
95
+ } ) ;
96
+ } else {
97
+ logger . debug ( "Enqueuing a message..." , { message } ) ;
98
+ }
88
99
await this . #sql`
89
100
INSERT INTO ${ this . #sql( this . #tableName) } (message, delay)
90
101
VALUES (
91
102
${ this . #json( message ) } ,
92
103
${ delay . toString ( ) }
93
104
);
94
105
` ;
106
+ logger . debug ( "Enqueued a message." , { message } ) ;
95
107
await this . #sql. notify ( this . #channelName, delay . toString ( ) ) ;
108
+ logger . debug ( "Notified the message queue channel {channelName}." , {
109
+ channelName : this . #channelName,
110
+ message,
111
+ } ) ;
96
112
}
97
113
98
114
async listen (
@@ -166,6 +182,9 @@ export class PostgresMessageQueue implements MessageQueue {
166
182
*/
167
183
async initialize ( ) : Promise < void > {
168
184
if ( this . #initialized) return ;
185
+ logger . debug ( "Initializing the message queue table {tableName}..." , {
186
+ tableName : this . #tableName,
187
+ } ) ;
169
188
try {
170
189
await this . #sql`
171
190
CREATE TABLE IF NOT EXISTS ${ this . #sql( this . #tableName) } (
@@ -175,16 +194,22 @@ export class PostgresMessageQueue implements MessageQueue {
175
194
created timestamp with time zone DEFAULT CURRENT_TIMESTAMP
176
195
);
177
196
` ;
178
- } catch ( e ) {
197
+ } catch ( error ) {
179
198
if (
180
- ! ( e instanceof postgres . PostgresError &&
181
- e . constraint_name === "pg_type_typname_nsp_index" )
199
+ ! ( error instanceof postgres . PostgresError &&
200
+ error . constraint_name === "pg_type_typname_nsp_index" )
182
201
) {
183
- throw e ;
202
+ logger . error ( "Failed to initialize the message queue table: {error}" , {
203
+ error,
204
+ } ) ;
205
+ throw error ;
184
206
}
185
207
}
186
208
this . #driverSerializesJson = await driverSerializesJson ( this . #sql) ;
187
209
this . #initialized = true ;
210
+ logger . debug ( "Initialized the message queue table {tableName}." , {
211
+ tableName : this . #tableName,
212
+ } ) ;
188
213
}
189
214
190
215
/**
0 commit comments