Skip to content

Commit a597da8

Browse files
abhinav04sharmainikep
authored andcommitted
Adding ability to apply binlogs thru mysqlbinlog in multi-threaded mode
Summary: Added a new flag in `mysqlbinlog` `--mta-workers=x` that tells the server to spawn `x` dependency applier workers to apply transactions. When `--mta-workers` is specified all events are printed in their base64 representation so we can create log events out of them. Differential Revision: D49466823 --------------------------------------------------------------------------- Cast enum into target type in a ternary operator (facebook#1411) Summary: This fixes a GCC build error: sql/rpl_replica.cc: In function ‘int slave_start_single_worker(Relay_log_info*, ulong)’: sql/rpl_replica.cc:7207:28: error: enumerated and non-enumerated type in conditional expression [-Werror=extra] 7207 | rli->is_fake() ? INFO_REPOSITORY_DUMMY : opt_rli_repository_id, i, | ~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pull Request resolved: facebook#1411 Differential Revision: D52207263 fbshipit-source-id: 47d7f5c
1 parent 4a10b3b commit a597da8

21 files changed

+398
-52
lines changed

client/mysqlbinlog.cc

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ static char *database = nullptr;
755755
static char *output_file = nullptr;
756756
static char *rewrite = nullptr;
757757
bool force_opt = false, short_form = false, idempotent_mode = false;
758+
int mta_workers = 0;
758759
static bool debug_info_flag, debug_check_flag;
759760
static bool force_if_open_opt = true, raw_mode = false;
760761
static bool to_last_remote_log = false, stop_never = false;
@@ -1292,6 +1293,26 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
12921293
return filtered;
12931294
}
12941295

1296+
static void print_event(Log_event *ev, FILE *result_file,
1297+
PRINT_EVENT_INFO *info) {
1298+
if (info->base64_output_mode != BASE64_OUTPUT_FULL) {
1299+
return ev->print(result_file, info);
1300+
}
1301+
1302+
auto cache = &info->head_cache;
1303+
1304+
if (!info->inside_group) my_b_printf(cache, "BINLOG '\n");
1305+
if (ev->starts_group() || is_any_gtid_event(ev)) info->inside_group = true;
1306+
if (ev->ends_group()) info->inside_group = false;
1307+
ev->print_base64(cache, info, info->inside_group);
1308+
1309+
if (ev->get_type_code() == mysql::binlog::event::FORMAT_DESCRIPTION_EVENT) {
1310+
info->printed_fd_event = true;
1311+
my_b_printf(cache, "/*!50616 SET @@SESSION.GTID_NEXT='AUTOMATIC'*/%s\n",
1312+
info->delimiter);
1313+
}
1314+
}
1315+
12951316
/**
12961317
Helper function that prints the cached begin query event to the output
12971318
@@ -1301,7 +1322,7 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
13011322
@retval False ERROR
13021323
*/
13031324
static bool print_cached_begin_query(PRINT_EVENT_INFO *print_event_info) {
1304-
begin_query_ev_cache->print(result_file, print_event_info);
1325+
print_event(begin_query_ev_cache, result_file, print_event_info);
13051326
auto head = &print_event_info->head_cache;
13061327
if (head->error == -1) {
13071328
return false;
@@ -1533,7 +1554,7 @@ void handle_last_rows_query_event(bool print,
15331554
my_off_t temp_log_pos = last_rows_query_event.event_pos;
15341555
auto old_hexdump_from = print_event_info->hexdump_from;
15351556
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
1536-
last_rows_query_event.event->print(result_file, print_event_info);
1557+
print_event(last_rows_query_event.event, result_file, print_event_info);
15371558
print_event_info->hexdump_from = old_hexdump_from;
15381559
}
15391560
last_rows_query_event.event->register_temp_buf(old_temp_buf);
@@ -1650,7 +1671,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
16501671

16511672
switch (ev_type) {
16521673
case mysql::binlog::event::TRANSACTION_PAYLOAD_EVENT:
1653-
ev->print(result_file, print_event_info);
1674+
print_event(ev, result_file, print_event_info);
16541675
if (head->error == -1) goto err;
16551676
break;
16561677
case mysql::binlog::event::QUERY_EVENT: {
@@ -1673,7 +1694,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
16731694
const my_off_t temp_log_pos = pop_event_array.event_pos;
16741695
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
16751696
if (!parent_query_skips)
1676-
temp_event->print(result_file, print_event_info);
1697+
print_event(temp_event, result_file, print_event_info);
16771698
delete temp_event;
16781699
}
16791700

@@ -1774,7 +1795,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
17741795
if (!in_transaction) seen_gtid = false;
17751796
}
17761797

1777-
ev->print(result_file, print_event_info);
1798+
print_event(ev, result_file, print_event_info);
17781799
if (head->error == -1) goto err;
17791800
break;
17801801
}
@@ -1795,7 +1816,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
17951816
the subsequent call load_processor.process fails, because the
17961817
output of Append_block_log_event::print is only a comment.
17971818
*/
1798-
ev->print(result_file, print_event_info);
1819+
print_event(ev, result_file, print_event_info);
17991820

18001821
if (opt_print_gtids && encounter_gtid(cached_gtid)) goto err;
18011822

@@ -1833,7 +1854,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18331854

18341855
print_event_info->common_header_len =
18351856
dynamic_cast<Format_description_event *>(ev)->common_header_len;
1836-
ev->print(result_file, print_event_info);
1857+
print_event(ev, result_file, print_event_info);
18371858

18381859
if (head->error == -1) goto err;
18391860
if (!force_if_open_opt &&
@@ -1848,7 +1869,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18481869
break;
18491870
}
18501871
case mysql::binlog::event::BEGIN_LOAD_QUERY_EVENT:
1851-
ev->print(result_file, print_event_info);
1872+
print_event(ev, result_file, print_event_info);
18521873
if (head->error == -1) goto err;
18531874
if ((retval = load_processor.process(
18541875
(Begin_load_query_log_event *)ev)) != OK_CONTINUE)
@@ -1864,6 +1885,10 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18641885
if (shall_skip_database(exlq->db))
18651886
print_event_info->skipped_event_in_transaction = true;
18661887
else {
1888+
if (print_event_info->base64_output_mode == BASE64_OUTPUT_FULL) {
1889+
error("Cannot handle Execute_load_query");
1890+
goto err;
1891+
}
18671892
if (fname) {
18681893
convert_path_to_forward_slashes(fname);
18691894
exlq->print(result_file, print_event_info, fname);
@@ -2038,7 +2063,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20382063
goto err;
20392064
}
20402065

2041-
ev->print(result_file, print_event_info);
2066+
print_event(ev, result_file, print_event_info);
20422067

20432068
print_event_info->have_unflushed_events = true;
20442069

@@ -2070,7 +2095,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20702095
print_event_info->delimiter);
20712096
print_event_info->skipped_event_in_transaction = false;
20722097

2073-
ev->print(result_file, print_event_info);
2098+
print_event(ev, result_file, print_event_info);
20742099
if (head->error == -1) goto err;
20752100
break;
20762101
}
@@ -2092,12 +2117,12 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20922117
begin_query_ev_cache = nullptr;
20932118
if (skip) break;
20942119
}
2095-
ev->print(result_file, print_event_info);
2120+
print_event(ev, result_file, print_event_info);
20962121
if (head->error == -1) goto err;
20972122
break;
20982123
}
20992124
case mysql::binlog::event::METADATA_EVENT: {
2100-
ev->print(result_file, print_event_info);
2125+
print_event(ev, result_file, print_event_info);
21012126
if (head->error == -1) goto err;
21022127

21032128
/* Copy and flush head cache and body cache */
@@ -2119,7 +2144,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
21192144
"--include-gtids, respectively, instead.");
21202145
[[fallthrough]];
21212146
default:
2122-
ev->print(result_file, print_event_info);
2147+
print_event(ev, result_file, print_event_info);
21232148
if (head->error == -1) goto err;
21242149
}
21252150
/* Flush head cache to result_file for every event */
@@ -2247,6 +2272,11 @@ static struct my_option my_long_options[] = {
22472272
"applying Row Events",
22482273
&idempotent_mode, &idempotent_mode, nullptr, GET_BOOL, NO_ARG, 0, 0, 0,
22492274
nullptr, 0, nullptr},
2275+
{"mta-workers", 'w',
2276+
"Number of multi-threaded workers to spawn on the "
2277+
"server to apply binlogs",
2278+
&mta_workers, &mta_workers, nullptr, GET_INT, REQUIRED_ARG, 0, 0, 0,
2279+
nullptr, 0, nullptr},
22502280
{"local-load", 'l',
22512281
"Prepare local temporary files for LOAD DATA INFILE in the specified "
22522282
"directory.",
@@ -3817,6 +3847,11 @@ static int args_post_process(void) {
38173847
global_tsid_lock->unlock();
38183848
}
38193849

3850+
if (mta_workers && opt_skip_gtids == 0) {
3851+
error("--mta-workers requires --skip-gtids option");
3852+
return ERROR_STOP;
3853+
}
3854+
38203855
return OK_CONTINUE;
38213856
}
38223857

@@ -4191,6 +4226,17 @@ int main(int argc, char **argv) {
41914226
fprintf(result_file, "/*!80019 SET @@SESSION.REQUIRE_ROW_FORMAT=1*/;\n\n");
41924227
}
41934228

4229+
auto orig_base64_output_mode = opt_base64_output_mode;
4230+
auto orig_short_form = short_form;
4231+
if (mta_workers) {
4232+
// we need to work in full base64 and short form mode for MTA
4233+
opt_base64_output_mode = BASE64_OUTPUT_FULL;
4234+
short_form = true;
4235+
fprintf(result_file,
4236+
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=%d*/;\n",
4237+
mta_workers);
4238+
}
4239+
41944240
if (opt_start_gtid_str != nullptr || opt_find_gtid_str != nullptr) {
41954241
if (opt_start_gtid_str != nullptr && opt_remote_proto == BINLOG_DUMP_GTID) {
41964242
char *args = const_cast<char *>("");
@@ -4219,7 +4265,12 @@ int main(int argc, char **argv) {
42194265

42204266
if (!raw_mode && opt_find_gtid_str == nullptr) {
42214267
fprintf(result_file, "# End of log file\n");
4222-
4268+
if (mta_workers) {
4269+
opt_base64_output_mode = orig_base64_output_mode;
4270+
short_form = orig_short_form;
4271+
fprintf(result_file,
4272+
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=0*/;\n");
4273+
}
42234274
fprintf(result_file,
42244275
"/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;\n");
42254276
if (disable_log_bin)

mysql-test/r/mysqld--help-notwin.result

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,9 @@ The following options may be given as the first argument:
10631063
minimum_hlc_ns is successfully updated is guranteed to be
10641064
greater than this value. The maximum allowed drift
10651065
(forward) is controlled by maximum_hlc_drift_ns
1066+
--mta-binlog-statement-workers[=#]
1067+
Internal variable to specify the Number of workers to
1068+
spawn to apply binlogs thru mysqlbinlog piping
10661069
--mts-dependency-cond-wait-timeout[=#]
10671070
Timeout for all conditional waits in dependency repl in
10681071
milliseconds
@@ -3409,6 +3412,7 @@ memlock FALSE
34093412
min-examined-row-limit 0
34103413
min-examined-row-limit-sql-stats 0
34113414
minimum-hlc-ns 0
3415+
mta-binlog-statement-workers 0
34123416
mts-dependency-cond-wait-timeout 5000
34133417
mts-dependency-max-keys 100000
34143418
mts-dependency-order-commits DB

mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ SET PERSIST_ONLY max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
153153
SET PERSIST_ONLY max_binlog_size = @@GLOBAL.max_binlog_size;
154154
SET PERSIST_ONLY max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
155155
SET PERSIST_ONLY max_relay_log_size = @@GLOBAL.max_relay_log_size;
156+
SET PERSIST_ONLY mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
156157
SET PERSIST_ONLY mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
157158
SET PERSIST_ONLY prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
158159
ERROR HY000: Variable 'prev_gtid_and_opid' is a non persistent read only variable
@@ -384,6 +385,7 @@ RESET PERSIST max_binlog_cache_size;
384385
RESET PERSIST max_binlog_size;
385386
RESET PERSIST max_binlog_stmt_cache_size;
386387
RESET PERSIST max_relay_log_size;
388+
RESET PERSIST mta_binlog_statement_workers;
387389
RESET PERSIST mts_dependency_replication;
388390
RESET PERSIST prev_gtid_and_opid;
389391
ERROR HY000: Variable prev_gtid_and_opid does not exist in persisted config file

mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ SET PERSIST max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
128128
SET PERSIST max_binlog_size = @@GLOBAL.max_binlog_size;
129129
SET PERSIST max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
130130
SET PERSIST max_relay_log_size = @@GLOBAL.max_relay_log_size;
131+
SET PERSIST mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
131132
SET PERSIST mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
132133
SET PERSIST prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
133134
ERROR HY000: Variable 'prev_gtid_and_opid' is a read only variable
@@ -385,6 +386,7 @@ RESET PERSIST IF EXISTS max_binlog_cache_size;
385386
RESET PERSIST IF EXISTS max_binlog_size;
386387
RESET PERSIST IF EXISTS max_binlog_stmt_cache_size;
387388
RESET PERSIST IF EXISTS max_relay_log_size;
389+
RESET PERSIST IF EXISTS mta_binlog_statement_workers;
388390
RESET PERSIST IF EXISTS mts_dependency_replication;
389391
RESET PERSIST IF EXISTS prev_gtid_and_opid;
390392
Warnings:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
include/master-slave.inc
2+
Warnings:
3+
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
4+
Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information.
5+
[connection master]
6+
call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");
7+
create table t1 (a int primary key) engine = innodb;
8+
flush binary logs;
9+
purge binary logs to 'master-bin.000001';
10+
insert into t1 values(1);
11+
insert into t1 values(2);
12+
insert into t1 values(3);
13+
insert into t1 values(4);
14+
include/sync_slave_sql_with_master.inc
15+
flush binary logs;
16+
delete from t1;
17+
"Case 1: No errors"
18+
include/sync_slave_sql_with_master.inc
19+
select * from t1;
20+
a
21+
1
22+
2
23+
3
24+
4
25+
select * from t1;
26+
a
27+
1
28+
2
29+
3
30+
4
31+
"Case 2: Duplicate key error on worker"
32+
delete from t1;
33+
insert into t1 values(3);
34+
include/sync_slave_sql_with_master.inc
35+
drop table t1;
36+
include/sync_slave_sql_with_master.inc
37+
include/rpl_end.inc
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
--enable-binlog-hlc
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
source include/master-slave.inc;
2+
source include/have_binlog_format_row.inc;
3+
4+
call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");
5+
6+
connection master;
7+
let $MYSQLD_DATADIR = `select @@global.datadir`;
8+
9+
let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);
10+
create table t1 (a int primary key) engine = innodb;
11+
12+
flush binary logs;
13+
eval purge binary logs to '$binlog_file';
14+
15+
insert into t1 values(1);
16+
insert into t1 values(2);
17+
insert into t1 values(3);
18+
insert into t1 values(4);
19+
source include/sync_slave_sql_with_master.inc;
20+
21+
connection master;
22+
let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);
23+
24+
flush binary logs;
25+
delete from t1;
26+
27+
echo "Case 1: No errors";
28+
exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot;
29+
source include/sync_slave_sql_with_master.inc;
30+
31+
connection master;
32+
select * from t1;
33+
34+
connection slave;
35+
select * from t1;
36+
37+
echo "Case 2: Duplicate key error on worker";
38+
connection master;
39+
delete from t1;
40+
insert into t1 values(3); # this will cause dup key error
41+
42+
exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot || true;
43+
source include/sync_slave_sql_with_master.inc;
44+
45+
connection master;
46+
drop table t1;
47+
source include/sync_slave_sql_with_master.inc;
48+
49+
source include/rpl_end.inc;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
SET @start_value = @@global.mta_binlog_statement_workers;
2+
SELECT @start_value;
3+
@start_value
4+
0
5+
SET @@GLOBAL.mta_binlog_statement_workers = 4;
6+
SET @@SESSION.mta_binlog_statement_workers = 4;
7+
SET @@SESSION.mta_binlog_statement_workers = 0;
8+
Warnings:
9+
Warning 1231 mta_binlog_statement_workers can only be set from mysqlbinlog
10+
SET @@GLOBAL.mta_binlog_statement_workers = @start_value;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
source include/load_sysvars.inc;
2+
3+
SET @start_value = @@global.mta_binlog_statement_workers;
4+
SELECT @start_value;
5+
6+
SET @@GLOBAL.mta_binlog_statement_workers = 4;
7+
8+
SET @@SESSION.mta_binlog_statement_workers = 4;
9+
10+
SET @@SESSION.mta_binlog_statement_workers = 0;
11+
12+
SET @@GLOBAL.mta_binlog_statement_workers = @start_value;

sql/log_event.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2529,7 +2529,10 @@ void Log_event::print_base64(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info,
25292529
}
25302530

25312531
if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) {
2532-
if (my_b_tell(file) == 0) {
2532+
// In BASE64_OUTPUT_FULL mode mysqlbinlog.cc will determine when to print
2533+
// "BINLOG". See @print_event()
2534+
if (print_event_info->base64_output_mode != BASE64_OUTPUT_FULL &&
2535+
my_b_tell(file) == 0) {
25332536
my_b_printf(file, "\nBINLOG '\n");
25342537
print_event_info->inside_binlog = true;
25352538
}

0 commit comments

Comments
 (0)