Skip to content

Commit 7c61f45

Browse files
committed
WIP
1 parent c4246d0 commit 7c61f45

File tree

2 files changed

+155
-38
lines changed

2 files changed

+155
-38
lines changed

src/adapter/src/coord/statement_logging.rs

Lines changed: 86 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,13 @@ impl StatementLogging {
180180
/// Check if we need to drop a statement
181181
/// due to throttling, and update internal data structures appropriately.
182182
///
183-
/// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
184-
/// is the number of statements that were dropped due to throttling before this one.
183+
/// Returns `false` if we must throttle this statement, and `true` otherwise.
185184
fn throttling_check(
186185
&mut self,
187186
cost: u64,
188187
target_data_rate: u64,
189188
max_data_credit: Option<u64>,
190-
) -> Option<usize> {
189+
) -> bool {
191190
let ts = (self.now)() / 1000;
192191
// We use saturating_sub here because system time isn't monotonic, causing cases
193192
// when last_logged_ts_seconds is greater than ts.
@@ -202,14 +201,14 @@ impl StatementLogging {
202201
if let Some(remaining) = self.tokens.checked_sub(cost) {
203202
debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
204203
self.tokens = remaining;
205-
Some(std::mem::take(&mut self.throttled_count))
204+
true
206205
} else {
207206
debug!(
208207
"throttling check failed. tokens available: {}; cost: {cost}",
209208
self.tokens
210209
);
211210
self.throttled_count += 1;
212-
None
211+
false
213212
}
214213
}
215214
}
@@ -276,15 +275,14 @@ impl Coordinator {
276275
/// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
277276
/// If so, actually do the check.
278277
///
279-
/// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
280-
/// is the number of statements that were dropped due to throttling before this one.
281-
fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
278+
/// Returns `false` if we must throttle this statement, and `true` otherwise.
279+
fn statement_logging_throttling_check(&mut self, cost: usize) -> bool {
282280
let Some(target_data_rate) = self
283281
.catalog
284282
.system_config()
285283
.statement_logging_target_data_rate()
286284
else {
287-
return Some(std::mem::take(&mut self.statement_logging.throttled_count));
285+
return true;
288286
};
289287
let max_data_credit = self
290288
.catalog
@@ -297,20 +295,32 @@ impl Coordinator {
297295
)
298296
}
299297

298+
fn log_prepared_statement(
299+
&mut self,
300+
uuid: Uuid,
301+
session: &mut Session,
302+
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
303+
) {
304+
let logging = session.qcell_rw(&*logging);
305+
if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
306+
*logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
307+
self.statement_logging.throttled_count = 0;
308+
}
309+
}
310+
300311
/// Returns any statement logging events needed for a particular
301312
/// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata.
302313
///
303314
/// This function does not do a sampling check, and assumes we did so in a higher layer.
304315
///
305-
/// It _does_ do a throttling check, and returns `None` if we must not log due to throttling.
306-
pub(crate) fn log_prepared_statement(
316+
pub(crate) fn get_prepared_statement_info(
307317
&mut self,
308318
session: &mut Session,
309319
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
310-
) -> Option<(
320+
) -> (
311321
Option<(StatementPreparedRecord, PreparedStatementEvent)>,
312322
Uuid,
313-
)> {
323+
) {
314324
let logging = session.qcell_rw(&*logging);
315325
let mut out = None;
316326

@@ -357,22 +367,20 @@ impl Coordinator {
357367
Datum::String(redacted_sql.as_str()),
358368
]);
359369

360-
let cost = mpsh_packer.byte_len() + sql_row.byte_len();
361-
let throttled_count = self.statement_logging_throttling_check(cost)?;
370+
let throttled_count = self.statement_logging.throttled_count;
362371
mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
372+
363373
out = Some((
364374
record,
365375
PreparedStatementEvent {
366376
prepared_statement: mpsh_row,
367377
sql_text: sql_row,
368378
},
369379
));
370-
371-
*logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
372380
uuid
373381
}
374382
};
375-
Some((out, uuid))
383+
(out, uuid)
376384
}
377385
/// The rate at which statement execution should be sampled.
378386
/// This is the value of the session var `statement_logging_sample_rate`,
@@ -700,6 +708,8 @@ impl Coordinator {
700708
distribution.sample(&mut thread_rng())
701709
};
702710

711+
// Figure out the cost of everything before we log.
712+
703713
// Track how many statements we're recording.
704714
let sampled_label = sample.then_some("true").unwrap_or("false");
705715
self.metrics
@@ -728,7 +738,8 @@ impl Coordinator {
728738
if !sample {
729739
return None;
730740
}
731-
let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
741+
742+
let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
732743

733744
let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
734745
lifecycle_timestamps.received
@@ -737,11 +748,6 @@ impl Coordinator {
737748
};
738749
let now = self.now();
739750
let execution_uuid = epoch_to_uuid_v7(&now);
740-
self.record_statement_lifecycle_event(
741-
&StatementLoggingId(execution_uuid),
742-
&StatementLifecycleEvent::ExecutionBegan,
743-
began_at,
744-
);
745751

746752
let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
747753
.map(|(r#type, datum)| {
@@ -785,28 +791,72 @@ impl Coordinator {
785791
.map(|s| s.as_str().to_string())
786792
.collect(),
787793
};
794+
788795
let mseh_update = Self::pack_statement_began_execution_update(&record);
796+
797+
let (maybe_ps_event, maybe_sh_update) = if let Some((ps_record, ps_event)) = maybe_ps {
798+
if let Some(sh) = self
799+
.statement_logging
800+
.unlogged_sessions
801+
.get(&ps_record.session_id)
802+
{
803+
(Some(ps_event), Some(Self::pack_session_history_update(sh)))
804+
} else {
805+
(None, None)
806+
}
807+
} else {
808+
(None, None)
809+
};
810+
811+
let statement_lifecycle_event_update = Self::pack_statement_lifecycle_event(
812+
&StatementLoggingId(execution_uuid),
813+
&StatementLifecycleEvent::ExecutionBegan,
814+
began_at,
815+
);
816+
817+
let cost = mseh_update.byte_len().saturating_add(
818+
maybe_ps_event
819+
.as_ref()
820+
.map(|e| {
821+
e.prepared_statement
822+
.byte_len()
823+
.saturating_add(e.sql_text.byte_len())
824+
})
825+
.unwrap_or(0)
826+
.saturating_add(maybe_sh_update.as_ref().map(|u| u.byte_len()).unwrap_or(0))
827+
.saturating_add(statement_lifecycle_event_update.byte_len()),
828+
);
829+
830+
if !self.statement_logging_throttling_check(cost) {
831+
return None;
832+
}
833+
834+
self.log_prepared_statement(ps_uuid, session, logging);
835+
836+
self.record_statement_lifecycle_event(
837+
&StatementLoggingId(execution_uuid),
838+
&StatementLifecycleEvent::ExecutionBegan,
839+
began_at,
840+
);
841+
789842
self.statement_logging
790843
.pending_statement_execution_events
791844
.push((mseh_update, Diff::ONE));
792845
self.statement_logging
793846
.executions_begun
794847
.insert(execution_uuid, record);
795-
if let Some((ps_record, ps_update)) = ps_record {
848+
849+
if let Some(sh_update) = maybe_sh_update {
850+
self.statement_logging
851+
.pending_session_events
852+
.push(sh_update);
853+
}
854+
if let Some(ps_event) = maybe_ps_event {
796855
self.statement_logging
797856
.pending_prepared_statement_events
798-
.push(ps_update);
799-
if let Some(sh) = self
800-
.statement_logging
801-
.unlogged_sessions
802-
.remove(&ps_record.session_id)
803-
{
804-
let sh_update = Self::pack_session_history_update(&sh);
805-
self.statement_logging
806-
.pending_session_events
807-
.push(sh_update);
808-
}
857+
.push(ps_event);
809858
}
859+
810860
Some(StatementLoggingId(execution_uuid))
811861
}
812862

src/environmentd/tests/server.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ fn test_statement_logging_throttling() {
518518
1.0,
519519
test_util::TestHarness::default().with_system_parameter_default(
520520
"statement_logging_target_data_rate".to_string(),
521-
"100".to_string(),
521+
"1000".to_string(),
522522
),
523523
);
524524
thread::sleep(Duration::from_secs(2));
@@ -536,7 +536,72 @@ fn test_statement_logging_throttling() {
536536
"SELECT
537537
sql,
538538
throttled_count
539-
FROM mz_internal.mz_prepared_statement_history mpsh
539+
FROM
540+
mz_internal.mz_statement_execution_history mseh
541+
JOIN mz_internal.mz_prepared_statement_history mpsh
542+
ON mseh.prepared_statement_id = mpsh.id
543+
JOIN (SELECT DISTINCT sql, sql_hash, redacted_sql FROM mz_internal.mz_sql_text) mst
544+
ON mpsh.sql_hash = mst.sql_hash
545+
WHERE sql IN ('SELECT 1', 'SELECT 2')",
546+
&[],
547+
)
548+
.unwrap();
549+
550+
if sl_results.iter().any(|stmt| {
551+
let sql: String = stmt.get(0);
552+
sql == "SELECT 2"
553+
}) {
554+
Ok(sl_results)
555+
} else {
556+
Err(())
557+
}
558+
})
559+
.expect("Never saw last statement (`SELECT 2`)");
560+
let throttled_count = logs
561+
.iter()
562+
.map(|log| {
563+
let UInt8(throttled_count) = log.get(1);
564+
throttled_count
565+
})
566+
.sum::<u64>();
567+
assert!(
568+
throttled_count > 0,
569+
"at least some statements should have been throttled"
570+
);
571+
572+
assert_eq!(logs.len() + usize::cast_from(throttled_count), 101);
573+
}
574+
575+
#[mz_ore::test]
576+
fn test_statement_logging_prepared_statement_throttling() {
577+
let (server, mut client) = setup_statement_logging_core(
578+
1.0,
579+
1.0,
580+
test_util::TestHarness::default().with_system_parameter_default(
581+
"statement_logging_target_data_rate".to_string(),
582+
"1000".to_string(),
583+
),
584+
);
585+
thread::sleep(Duration::from_secs(2));
586+
let statement = client.prepare("SELECT 1").unwrap();
587+
588+
for _ in 0..100 {
589+
client.execute(&statement, &[]).unwrap();
590+
}
591+
thread::sleep(Duration::from_secs(4));
592+
client.execute("SELECT 2", &[]).unwrap();
593+
let mut client = server.connect_internal(postgres::NoTls).unwrap();
594+
let logs = Retry::default()
595+
.max_duration(Duration::from_secs(60))
596+
.retry(|_| {
597+
let sl_results = client
598+
.query(
599+
"SELECT
600+
sql,
601+
throttled_count
602+
FROM mz_internal.mz_statement_execution_history mseh
603+
JOIN mz_internal.mz_prepared_statement_history mpsh
604+
ON mseh.prepared_statement_id = mpsh.id
540605
JOIN (SELECT DISTINCT sql, sql_hash, redacted_sql FROM mz_internal.mz_sql_text) mst
541606
ON mpsh.sql_hash = mst.sql_hash
542607
WHERE sql IN ('SELECT 1', 'SELECT 2')",
@@ -557,7 +622,9 @@ WHERE sql IN ('SELECT 1', 'SELECT 2')",
557622
let throttled_count = logs
558623
.iter()
559624
.map(|log| {
625+
let txt = log.get::<_, String>(0);
560626
let UInt8(throttled_count) = log.get(1);
627+
println!("throttled_count for row {}: {}", txt, throttled_count);
561628
throttled_count
562629
})
563630
.sum::<u64>();

0 commit comments

Comments
 (0)