@@ -28,6 +28,7 @@ pub struct Producer {
28
28
tasks : Vec < JoinHandle < ( ) > > ,
29
29
shard_buffer : HashMap < ShardId , Vec < Record > > ,
30
30
shard_buffer_state : HashMap < ShardId , BufferState > ,
31
+ shard_urls : HashMap < ShardId , String > ,
31
32
request_receiver : tokio:: sync:: mpsc:: UnboundedReceiver < Request > ,
32
33
channels : Channels ,
33
34
url_scheme : String ,
@@ -84,6 +85,7 @@ impl Producer {
84
85
tasks : Vec :: new ( ) ,
85
86
shard_buffer : HashMap :: new ( ) ,
86
87
shard_buffer_state : HashMap :: new ( ) ,
88
+ shard_urls : HashMap :: new ( ) ,
87
89
request_receiver,
88
90
channels,
89
91
url_scheme,
@@ -98,50 +100,94 @@ impl Producer {
98
100
pub async fn start ( & mut self ) {
99
101
while let Some ( Request ( record) ) = self . request_receiver . recv ( ) . await {
100
102
let partition_key = record. partition_key . clone ( ) ;
101
- match partition_key_to_shard_id ( & self . shards , partition_key) {
103
+ match partition_key_to_shard_id ( & self . shards , partition_key. clone ( ) ) {
102
104
Err ( err) => {
103
- log:: error!( "get shard id by partition key error: {:?}" , err)
105
+ log:: error!(
106
+ "get shard id by partition key error: partition_key = {partition_key}, {err}"
107
+ )
104
108
}
105
- Ok ( shard_id) => match self . shard_buffer . get_mut ( & shard_id) {
106
- None => {
107
- let mut buffer_state: BufferState = default ( ) ;
108
- buffer_state. modify ( & record) ;
109
- self . shard_buffer_state . insert ( shard_id, buffer_state) ;
110
- self . shard_buffer . insert ( shard_id, vec ! [ record] ) ;
111
- }
112
- Some ( buffer) => {
113
- let buffer_state = self . shard_buffer_state . get_mut ( & shard_id) . unwrap ( ) ;
114
- buffer_state. modify ( & record) ;
115
- buffer. push ( record) ;
116
- if buffer_state. check ( & self . flush_settings ) {
117
- let buffer = clear_shard_buffer ( & mut self . shard_buffer , shard_id) ;
118
- self . shard_buffer_state . insert ( shard_id, default ( ) ) ;
119
- let task = tokio:: spawn ( flush_ (
120
- self . channels . clone ( ) ,
121
- self . url_scheme . clone ( ) ,
122
- self . stream_name . clone ( ) ,
123
- shard_id,
124
- self . compression_type ,
125
- buffer,
126
- ) ) ;
127
- self . tasks . push ( task) ;
109
+ Ok ( shard_id) => {
110
+ let shard_url = self . shard_urls . get ( & shard_id) ;
111
+ let shard_url_is_none = shard_url. is_none ( ) ;
112
+ match lookup_shard (
113
+ & mut self . channels . channel ( ) . await ,
114
+ & self . url_scheme ,
115
+ shard_id,
116
+ shard_url,
117
+ )
118
+ . await
119
+ {
120
+ Err ( err) => {
121
+ log:: error!( "lookup shard error: shard_id = {shard_id}, {err}" )
122
+ }
123
+ Ok ( shard_url) => {
124
+ if shard_url_is_none {
125
+ self . shard_urls . insert ( shard_id, shard_url. clone ( ) ) ;
126
+ } ;
127
+ match self . shard_buffer . get_mut ( & shard_id) {
128
+ None => {
129
+ let mut buffer_state: BufferState = default ( ) ;
130
+ buffer_state. modify ( & record) ;
131
+ self . shard_buffer_state . insert ( shard_id, buffer_state) ;
132
+ self . shard_buffer . insert ( shard_id, vec ! [ record] ) ;
133
+ }
134
+ Some ( buffer) => {
135
+ let buffer_state =
136
+ self . shard_buffer_state . get_mut ( & shard_id) . unwrap ( ) ;
137
+ buffer_state. modify ( & record) ;
138
+ buffer. push ( record) ;
139
+ if buffer_state. check ( & self . flush_settings ) {
140
+ let buffer =
141
+ clear_shard_buffer ( & mut self . shard_buffer , shard_id) ;
142
+ self . shard_buffer_state . insert ( shard_id, default ( ) ) ;
143
+ let task = tokio:: spawn ( flush_ (
144
+ self . channels . clone ( ) ,
145
+ self . stream_name . clone ( ) ,
146
+ shard_id,
147
+ shard_url,
148
+ self . compression_type ,
149
+ buffer,
150
+ ) ) ;
151
+ self . tasks . push ( task) ;
152
+ }
153
+ }
154
+ }
128
155
}
129
156
}
130
- } ,
157
+ }
131
158
}
132
159
}
133
160
134
161
let mut shard_buffer = mem:: take ( & mut self . shard_buffer ) ;
135
162
for ( shard_id, buffer) in shard_buffer. iter_mut ( ) {
136
- let task = tokio:: spawn ( flush_ (
137
- self . channels . clone ( ) ,
138
- self . url_scheme . clone ( ) ,
139
- self . stream_name . clone ( ) ,
163
+ let shard_url = self . shard_urls . get ( shard_id) ;
164
+ let shard_url_is_none = shard_url. is_none ( ) ;
165
+ match lookup_shard (
166
+ & mut self . channels . channel ( ) . await ,
167
+ & self . url_scheme ,
140
168
* shard_id,
141
- self . compression_type ,
142
- mem:: take ( buffer) ,
143
- ) ) ;
144
- self . tasks . push ( task) ;
169
+ shard_url,
170
+ )
171
+ . await
172
+ {
173
+ Err ( err) => {
174
+ log:: error!( "lookup shard error: shard_id = {shard_id}, {err}" )
175
+ }
176
+ Ok ( shard_url) => {
177
+ if shard_url_is_none {
178
+ self . shard_urls . insert ( * shard_id, shard_url. clone ( ) ) ;
179
+ } ;
180
+ let task = tokio:: spawn ( flush_ (
181
+ self . channels . clone ( ) ,
182
+ self . stream_name . clone ( ) ,
183
+ * shard_id,
184
+ shard_url,
185
+ self . compression_type ,
186
+ mem:: take ( buffer) ,
187
+ ) ) ;
188
+ self . tasks . push ( task) ;
189
+ }
190
+ }
145
191
}
146
192
147
193
let tasks = std:: mem:: take ( & mut self . tasks ) ;
@@ -155,56 +201,46 @@ impl Producer {
155
201
156
202
async fn flush (
157
203
channels : Channels ,
158
- url_scheme : String ,
159
204
stream_name : String ,
160
205
shard_id : ShardId ,
206
+ shard_url : String ,
161
207
compression_type : CompressionType ,
162
208
buffer : Vec < Record > ,
163
209
) -> Result < ( ) , String > {
164
210
if !buffer. is_empty ( ) {
165
- match lookup_shard ( & mut channels. channel ( ) . await , & url_scheme, shard_id) . await {
166
- Err ( err) => {
167
- log:: warn!( "{err}" ) ;
168
- Ok ( ( ) )
169
- }
170
- Ok ( server_node) => {
171
- let channel = channels
172
- . channel_at ( server_node. clone ( ) )
173
- . await
174
- . map_err ( |err| {
175
- format ! ( "producer connect error: url = {server_node}, {err:?}" )
176
- } ) ?;
177
- append (
178
- channel,
179
- stream_name,
180
- shard_id,
181
- compression_type,
182
- buffer. to_vec ( ) ,
183
- )
184
- . await
185
- . map_err ( |err| format ! ( "producer append error: addr = {server_node}, {err:?}" ) )
186
- . map ( |x| log:: debug!( "append succeed: len = {}" , x. len( ) ) ) ?;
187
- Ok ( ( ) )
188
- }
189
- }
211
+ let channel = channels
212
+ . channel_at ( shard_url. clone ( ) )
213
+ . await
214
+ . map_err ( |err| format ! ( "producer connect error: url = {shard_url}, {err}" ) ) ?;
215
+ append (
216
+ channel,
217
+ stream_name,
218
+ shard_id,
219
+ compression_type,
220
+ buffer. to_vec ( ) ,
221
+ )
222
+ . await
223
+ . map_err ( |err| format ! ( "producer append error: url = {shard_url}, {err}" ) )
224
+ . map ( |x| log:: debug!( "append succeed: len = {}" , x. len( ) ) ) ?;
225
+ Ok ( ( ) )
190
226
} else {
191
227
Ok ( ( ) )
192
228
}
193
229
}
194
230
195
231
async fn flush_ (
196
232
channels : Channels ,
197
- url_scheme : String ,
198
233
stream_name : String ,
199
234
shard_id : ShardId ,
235
+ shard_url : String ,
200
236
compression_type : CompressionType ,
201
237
buffer : Vec < Record > ,
202
238
) {
203
239
flush (
204
240
channels,
205
- url_scheme,
206
241
stream_name,
207
242
shard_id,
243
+ shard_url,
208
244
compression_type,
209
245
buffer,
210
246
)
0 commit comments