From d832f1c61a9162953b3b5fdb46fcb068fa420fde Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 15 Apr 2025 10:47:50 -0700 Subject: [PATCH] Extend NamedBlobMysqlDatabasePerf tool for more tests --- .../github/ambry/commons/RetryExecutor.java | 4 +- .../perf/NamedBlobMysqlDatabasePerf.java | 793 ++++++++++++------ .../perf/NamedBlobMysqlDatabasePerfTest.java | 50 ++ 3 files changed, 574 insertions(+), 273 deletions(-) create mode 100644 ambry-tools/src/test/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerfTest.java diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/RetryExecutor.java b/ambry-commons/src/main/java/com/github/ambry/commons/RetryExecutor.java index 49e74ad08e..3fc6d8f6e3 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/RetryExecutor.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/RetryExecutor.java @@ -65,7 +65,7 @@ private void recursiveAsyncRetry(Consumer> call, Predicate void recursiveAsyncRetry(Consumer> call, Predicate java -cp "*" com.github.ambry.tools.perf.NamedBlobMysqlDatabasePerf - * > --db_username database_username - * > --db_datacenter datacenter_name - * > --db_name database_name - * > --db_host database_host - * > --parallelism 10 // number of connections to create - * > --target_row 10 // number of millions of rows to create before performance test - * > --include_list false // true of false to include list operations in the performance test - * - * Or you can provide a property file to include all the arguments in the above command, for example: + * You also have to provide a property file to include all the arguments, for example: * > cat named_blob.props - * db_username=database_username - * db_password=databse_pasword // yes, you can also provide database password in the prop file - * db_datacenter=datacenter_name - * db_name=database_name - * db_host=database_hostname + * db.user.name=database_username + * db.password=databse_pasword + * db.datacenter=datacenter_name + * db.name=database_name + * db.host=database_hostname * parallelism=10 - * target_rows=10 - * include_list=false + * target.rows.in.million=10 + * num.operations=1000000 + * test.type=LIST * > java -cp "*" com.github.ambry.tools.perf.NamedBlobMysqlDatabasePerf --props named_blob.props */ public class NamedBlobMysqlDatabasePerf { + public static final String ACCOUNT_NAME_FORMAT = "ACCOUNT_%d"; + public static final String CONTAINER_NAME_FORMAT = "CONTAINER_%d"; + public static final short HUGE_LIST_ACCOUNT_ID = 1024; public static final short HUGE_LIST_CONTAINER_ID = 8; + public static final String HUGE_LIST_ACCOUNT_NAME = String.format(ACCOUNT_NAME_FORMAT, HUGE_LIST_ACCOUNT_ID); + public static final String HUGE_LIST_CONTAINER_NAME = String.format(CONTAINER_NAME_FORMAT, HUGE_LIST_CONTAINER_ID); public static final String HUGE_LIST_COMMON_PREFIX = "hugeListCommonPrefix/NamedBlobMysqlDatabasePerf-"; - // 10% of the chance we would use this account id and container id for insert - public static final float PERCENTAGE_FOR_HUGE_LIST = 0.1f; - - public static final String ACCOUNT_NAME_FORMAT = "ACCOUNT_%d"; - public static final String CONTAINER_NAME_FORMAT = "CONTAINER_%d"; + public static final String HUGE_LIST_TEMPLATE = "hugeListCommonPrefix/NamedBlobMysqlDatabasePerf-%s/%s"; public static final int NUMBER_ACCOUNT = 100; public static final int NUMBER_CONTAINER = 10; @@ -108,152 +101,264 @@ public class NamedBlobMysqlDatabasePerf { public static final PartitionId PARTITION_ID = new MockPartitionId(); - public static final String DB_USERNAME = "db_username"; - public static final String DB_DATACENTER = "db_datacenter"; - public static final String DB_NAME = "db_name"; - public static final String DB_HOST = "db_host"; - public static final String DB_PASSWORD = "db_password"; - public static final String PARALLELISM = "parallelism"; - public static final String TARGET_ROWS = "target_rows"; - public static final String INCLUDE_LIST = "include_list"; + public static class PerfConfig { + /** + * Dataceneter from there this cli is executed + */ + @Config("db.datacenter") + public final String dbDatacenter; - public static void main(String[] args) throws Exception { - OptionParser parser = new OptionParser(); - - ArgumentAcceptingOptionSpec propsFilepathOpt = parser.accepts("props", "Path to property file \n" - + "All the argument can be provided through a property file with key=value at each line.\n" - + "If an parameter both exist in the property file and the command line argument, the value in the command line argument would override the value in the property file") - .withRequiredArg() - .describedAs("props_file") - .ofType(String.class); - - // database connection configuration - ArgumentAcceptingOptionSpec dbUsernameOpt = parser.accepts(DB_USERNAME, "Username to database") - .withRequiredArg() - .describedAs("db_username") - .ofType(String.class); - - ArgumentAcceptingOptionSpec dbDatacenterOpt = parser.accepts(DB_DATACENTER, "Datacenter of the database") - .withRequiredArg() - .describedAs("datacenter") - .ofType(String.class); - - ArgumentAcceptingOptionSpec dbNameOpt = - parser.accepts(DB_NAME, "Database name").withRequiredArg().describedAs("db_name").ofType(String.class); - - ArgumentAcceptingOptionSpec dbHostOpt = - parser.accepts(DB_HOST, "Database host").withRequiredArg().describedAs("db_host").ofType(String.class); - - ArgumentAcceptingOptionSpec parallelismOpt = - parser.accepts(PARALLELISM, "Number of thread to execute sql statement") - .withRequiredArg() - .describedAs("parallelism") - .ofType(Integer.class); - - ArgumentAcceptingOptionSpec targetMRowsOpt = parser.accepts(TARGET_ROWS, - "Number of rows to insert so the total database rows would reach this target." - + "Notice that this target is in in millions. If the value is 1, this command would make sure database would have 1 million rows.") - .withRequiredArg() - .describedAs("target_rows") - .ofType(Integer.class); - - OptionSpec includeListTestOpt = - parser.accepts(INCLUDE_LIST, "Including list operation in the performance tests."); - - OptionSet options = parser.parse(args); - Properties props = new Properties(); - if (options.has(propsFilepathOpt)) { - String propFilepath = options.valueOf(propsFilepathOpt); - System.out.println("Loading properties from file " + propFilepath); - try (FileInputStream fis = new FileInputStream(propFilepath)) { - props.load(fis); - } - System.out.println("Getting properties: " + props.stringPropertyNames()); - } - if (options.has(dbUsernameOpt)) { - props.setProperty(DB_USERNAME, options.valueOf(dbUsernameOpt)); - } - if (options.has(dbDatacenterOpt)) { - props.setProperty(DB_DATACENTER, options.valueOf(dbDatacenterOpt)); - } - if (options.has(dbNameOpt)) { - props.setProperty(DB_NAME, options.valueOf(dbNameOpt)); - } - if (options.has(dbHostOpt)) { - props.setProperty(DB_HOST, options.valueOf(dbHostOpt)); + /** + * The host of database instance (or cluster), including port + */ + @Config("db.host") + public final String dbHost; + + /** + * The name of database + */ + @Config("db.name") + public final String dbName; + + /** + * The user name to access database + */ + @Config("db.user.name") + public final String dbUserName; + + /** + * The password for the given username. If you enable certificate based authentication, this will + * be ignored, just provide a random string. + * + * You can also provide an empty string when certificate based authentication is not enabled, + * you will be prompted to type password when running this cli. + */ + @Config("db.password") + public final String dbPassword; + + /** + * True to enable certificate based authentication. If this is true, you must also provide ssl configuration + * including + *
    + *
  • {@link SSLConfig#sslKeystoreType}
  • + *
  • {@link SSLConfig#sslKeystorePath}
  • + *
  • {@link SSLConfig#sslKeystorePassword}
  • + *
  • {@link SSLConfig#sslTruststoreType}
  • + *
  • {@link SSLConfig#sslTruststorePath}
  • + *
  • {@link SSLConfig#sslTruststorePassword}
  • + *
+ */ + @Config("enable.certificate.based.authentication") + public final boolean enableCertificateBasedAuthentication; + + /** + * The number of parallel threads to run the performance test. + */ + @Config("parallelism") + public final int parallelism; + + /** + * The target number of rows in the database, in millions. If there are not enough rows in the database before + * running the performance test, this tool will fill the database with new rows until it hits the target number. + */ + @Config("target.rows.in.million") + public final int targetRowsInMillion; + + /** + * Enable hard delete when deleting named blobs. + */ + @Config("enable.hard.delete") + public final boolean enableHardDelete; + + /** + * The number of operations to exuected in performance test. + */ + @Config("num.operations") + public final int numOperations; + + /** + * The test type to run. It can be one of the following: + *
    + *
  • {@link TestType#CUSTOM}
  • + *
  • {@link TestType#LIST}
  • + *
  • {@link TestType#READ_WRITE}
  • + *
+ */ + @Config("test.type") + public final TestType testType; + + /** + * Only do write operations in CUSTOM test type when this is true. + */ + @Config("custom.only.writes") + public final boolean onlyWrites; + + /** + * Create a {@link PerfConfig} given a {@link VerifiableProperties} object. + * @param verifiableProperties + * @throws Exception + */ + public PerfConfig(VerifiableProperties verifiableProperties) throws Exception { + dbDatacenter = verifiableProperties.getString("db.datacenter", ""); + dbHost = verifiableProperties.getString("db.host", ""); + dbName = verifiableProperties.getString("db.name", ""); + dbUserName = verifiableProperties.getString("db.user.name", ""); + dbPassword = verifiableProperties.getString("db.password", ""); + enableCertificateBasedAuthentication = + verifiableProperties.getBoolean("enable.certificate.based.authentication", false); + parallelism = verifiableProperties.getInt("parallelism", 0); + targetRowsInMillion = verifiableProperties.getIntInRange("target.rows.in.million", 0, 0, 100); + enableHardDelete = verifiableProperties.getBoolean("enable.hard.delete", false); + numOperations = verifiableProperties.getInt("num.operations", 0); + testType = verifiableProperties.getEnum("test.type", TestType.class, TestType.READ_WRITE); + onlyWrites = verifiableProperties.getBoolean("custom.only.writes", false); + validate(); } - if (options.has(parallelismOpt)) { - props.setProperty(PARALLELISM, String.valueOf(options.valueOf(parallelismOpt))); + + /** + * Validate if the configurations are valid + * @throws Exception + */ + public void validate() throws Exception { + validateNotEmpty(dbDatacenter, "db.datacenter"); + validateNotEmpty(dbHost, "db.host"); + validateNotEmpty(dbName, "db.name"); + validateNotEmpty(dbUserName, "db.user.name"); + validatePositive(parallelism, "parallelism"); + validatePositive(numOperations, "num.operations"); } - if (options.has(targetMRowsOpt)) { - props.setProperty(TARGET_ROWS, String.valueOf(options.valueOf(targetMRowsOpt))); + + private void validateNotEmpty(String value, String name) throws Exception { + if (isStringEmpty(value)) { + throw new IllegalArgumentException(name + " should not be empty"); + } } - if (options.has(includeListTestOpt)) { - props.setProperty(INCLUDE_LIST, "true"); + + private void validatePositive(int value, String name) throws Exception { + if (value <= 0) { + throw new IllegalArgumentException(name + " should be positive"); + } } - if (!props.containsKey(INCLUDE_LIST)) { - props.setProperty(INCLUDE_LIST, "false"); + } + + /** + * The type of test to run. This cli tool can be used to run various types of tests. Reach {@Link TestType} is + * associated with a {@link PerformanceTestWorker} class that implements several static methods. + *
    + *
  • getNumberOfExistingRows, which takes in a {@link DataSource} and return long
  • + *
  • generateNewNamedBlobRecord, which takes in a {@link Random} and a list of {@link Account}s and return a {@link NamedBlobRecord}
  • + *
+ */ + public enum TestType { + READ_WRITE, LIST, CUSTOM; + + /** + * Associate TestType with {@link PerformanceTestWorker} class. + */ + static final Map workerClassMap = new HashMap() { + { + put(READ_WRITE, ReadWritePerformanceTestWorker.class); + put(LIST, ListPerformanceTestWorker.class); + put(CUSTOM, CustomPerformanceTestWorker.class); + } + }; + + /** + * Return the number of existing rows for each {@link TestType}. + * @param dataSource + * @return + * @throws Exception + */ + public long getNumberOfExistingRows(DataSource dataSource) throws Exception { + Class clazz = workerClassMap.get(this); + Method method = clazz.getDeclaredMethod("getNumberOfExistingRows", DataSource.class); + Object object = method.invoke(null, dataSource); + if (object instanceof Long) { + return (Long) object; + } else { + throw new IllegalArgumentException("getNumberOfExistingRows should return a long"); + } } - List requiredArguments = - Arrays.asList(DB_USERNAME, DB_DATACENTER, DB_NAME, DB_HOST, PARALLELISM, TARGET_ROWS, INCLUDE_LIST); - for (String requiredArgument : requiredArguments) { - if (!props.containsKey(requiredArgument)) { - System.err.println( - "Missing " + requiredArgument + "! Please provide it through property file or the command line argument"); - parser.printHelpOn(System.err); - System.exit(1); + /** + * Return the newly generated {@link NamedBlobRecord} for each {@link TestType}. + * @param random + * @param allAccounts + * @return + * @throws Exception + */ + public NamedBlobRecord generateNewNamedBlobRecord(Random random, List allAccounts) throws Exception { + Class clazz = workerClassMap.get(this); + Method method = clazz.getDeclaredMethod("generateNewNamedBlobRecord", Random.class, List.class); + Object object = method.invoke(null, random, allAccounts); + if (object instanceof NamedBlobRecord) { + return (NamedBlobRecord) object; + } else { + throw new IllegalArgumentException("generateNewNamedBlobRecord should return a NamedBlobRecord"); } } - // Now ask for password if it's not provided in the propFile - if (!props.containsKey(DB_PASSWORD)) { - String password = - ToolUtils.passwordInput("Please input database password for user " + props.getProperty(DB_USERNAME) + ": "); - props.setProperty(DB_PASSWORD, password); + /** + * Get class object associated with each {@link TestType}. + * @return + */ + public Class getWorkerClass() { + return workerClassMap.get(this); } + } - // Now create a mysql named blob data accessor + static boolean isStringEmpty(String value) { + return value == null || value.isEmpty() || value.trim().isEmpty(); + } + + public static void main(String[] args) throws Exception { + VerifiableProperties verifiableProperties = ToolUtils.getVerifiableProperties(args); + PerfConfig config = new PerfConfig(verifiableProperties); Properties newProperties = new Properties(); - String dbUrl = - "jdbc:mysql://" + props.getProperty(DB_HOST) + "/" + props.getProperty(DB_NAME) + "?serverTimezone=UTC"; + + String password = config.dbPassword; + if (!config.enableCertificateBasedAuthentication) { + // we are going to use password + if (isStringEmpty(password)) { + password = ToolUtils.passwordInput("Please input database password for user " + config.dbUserName + ": "); + } + } else { + newProperties.setProperty(MySqlNamedBlobDbConfig.ENABLE_CERTIFICATE_BASED_AUTHENTICATION, "true"); + newProperties.setProperty("ssl.keystore.type", verifiableProperties.getString("ssl.keystore.type")); + newProperties.setProperty("ssl.keystore.path", verifiableProperties.getString("ssl.keystore.path")); + newProperties.setProperty("ssl.keystore.password", verifiableProperties.getString("ssl.keystore.password")); + newProperties.setProperty("ssl.truststore.type", verifiableProperties.getString("ssl.truststore.type")); + newProperties.setProperty("ssl.truststore.path", verifiableProperties.getString("ssl.truststore.path")); + newProperties.setProperty("ssl.truststore.password", verifiableProperties.getString("ssl.truststore.password")); + } + + // Now create a mysql named blob data accessor + String dbUrl = "jdbc:mysql://" + config.dbHost + "/" + config.dbName + "?serverTimezone=UTC"; MySqlUtils.DbEndpoint dbEndpoint = - new MySqlUtils.DbEndpoint(dbUrl, props.getProperty(DB_DATACENTER), true, props.getProperty(DB_USERNAME), - props.getProperty(DB_PASSWORD)); + new MySqlUtils.DbEndpoint(dbUrl, config.dbDatacenter, true, config.dbUserName, password); JSONArray jsonArray = new JSONArray(); jsonArray.put(dbEndpoint.toJson()); System.out.println("DB_INFO: " + jsonArray); newProperties.setProperty(MySqlNamedBlobDbConfig.DB_INFO, jsonArray.toString()); - newProperties.setProperty(MySqlNamedBlobDbConfig.LOCAL_POOL_SIZE, - String.valueOf(2 * Integer.valueOf(props.getProperty(PARALLELISM)))); - newProperties.setProperty(ClusterMapConfig.CLUSTERMAP_DATACENTER_NAME, props.getProperty(DB_DATACENTER)); + newProperties.setProperty(MySqlNamedBlobDbConfig.LIST_NAMED_BLOBS_SQL_OPTION, + String.valueOf(MySqlNamedBlobDbConfig.MAX_LIST_NAMED_BLOBS_SQL_OPTION)); + newProperties.setProperty(MySqlNamedBlobDbConfig.LOCAL_POOL_SIZE, String.valueOf(2 * config.parallelism)); + newProperties.setProperty(MySqlNamedBlobDbConfig.ENABLE_HARD_DELETE, String.valueOf(config.enableHardDelete)); + newProperties.setProperty(ClusterMapConfig.CLUSTERMAP_DATACENTER_NAME, config.dbDatacenter); - int numThreads = Integer.valueOf(props.getProperty(PARALLELISM)); + int numThreads = config.parallelism; ScheduledExecutorService executor = Utils.newScheduler(numThreads + 1, "workers-", false); MetricRegistry registry = new MetricRegistry(); // Mock an account service AccountService accountService = createInMemoryAccountService(); MySqlNamedBlobDbFactory factory = - new MySqlNamedBlobDbFactory(new VerifiableProperties(newProperties), registry, accountService, - SystemTime.getInstance(), ""); + new MySqlNamedBlobDbFactory(new VerifiableProperties(newProperties), registry, accountService); DataSource dataSource = factory.buildDataSource(dbEndpoint); NamedBlobDb namedBlobDb = factory.getNamedBlobDb(); + TestType testType = config.testType; - // First, fill the database with target number of rows - long targetRows = Long.valueOf(props.getProperty(TARGET_ROWS)) * 1000000L; - long existingRows = getNumberOfRowsInDatabase(dataSource); - if (existingRows >= targetRows) { - System.out.println("Existing number of rows: " + existingRows + ", more than target number of rows: " + targetRows - + ", skip filling database rows"); - } else { - fillDatabase(registry, namedBlobDb, executor, accountService, existingRows, targetRows, numThreads); - } - - // Then starting doing performance tests - // For performance testing, we are expecting each database query's average latency to be 2ms. So each worker can achieve - // 500 QPS. We are trying to achieve 80K QPS for each query, which means we at least need 160 threads. - boolean includeList = props.getProperty(INCLUDE_LIST).equals("true"); - runPerformanceTest(registry, namedBlobDb, executor, accountService, numThreads, includeList); + prepareDatabaseForPerfTest(testType, registry, namedBlobDb, dataSource, executor, accountService, config); + runPerformanceTest(registry, namedBlobDb, testType, executor, accountService, numThreads, config); Utils.shutDownExecutorService(executor, 10, TimeUnit.SECONDS); namedBlobDb.close(); @@ -286,10 +391,10 @@ private static AccountService createInMemoryAccountService() throws Exception { // Now add the special account //@formatter:off accounts.add( - new AccountBuilder(HUGE_LIST_ACCOUNT_ID, String.format(ACCOUNT_NAME_FORMAT, HUGE_LIST_ACCOUNT_ID), Account.AccountStatus.ACTIVE) + new AccountBuilder(HUGE_LIST_ACCOUNT_ID, HUGE_LIST_ACCOUNT_NAME, Account.AccountStatus.ACTIVE) .containers( Collections.singletonList( - new ContainerBuilder(HUGE_LIST_CONTAINER_ID, String.format(CONTAINER_NAME_FORMAT, HUGE_LIST_CONTAINER_ID), Container.ContainerStatus.ACTIVE, "", HUGE_LIST_ACCOUNT_ID) + new ContainerBuilder(HUGE_LIST_CONTAINER_ID, HUGE_LIST_CONTAINER_NAME, Container.ContainerStatus.ACTIVE, "", HUGE_LIST_ACCOUNT_ID) .build())) .build()); //@formatter:on @@ -297,40 +402,28 @@ private static AccountService createInMemoryAccountService() throws Exception { return accountService; } - /** - * Get total number of rows of the target table. - * @param datasource Datasource to execute query on. - * @return Total number of rows - * @throws Exception - */ - private static long getNumberOfRowsInDatabase(DataSource datasource) throws Exception { - String rowQuerySql = "SELECT COUNT(*) as total FROM " + TABLE_NAME; - long numberOfRows = 0; - try (Connection connection = datasource.getConnection()) { - try (PreparedStatement queryStatement = connection.prepareStatement(rowQuerySql)) { - try (ResultSet result = queryStatement.executeQuery()) { - while (result.next()) { - numberOfRows = result.getLong("total"); - } - } - } - } - return numberOfRows; - } - /** * Fill database(table) with rows so we can reach the target number of rows. * @param registry The {@link MetricRegistry} object. * @param namedBlobDb The {@link NamedBlobDb} object. * @param executor The {@link ScheduledExecutorService} object. * @param accountService The {@link AccountService} object. - * @param existingRows The number of existing rows. - * @param targetRows The number of target rows. - * @param numThreads The number of threads for insert database rows. + * @param config The {@link PerfConfig} object that contains properties for the test. * @throws Exception */ - private static void fillDatabase(MetricRegistry registry, NamedBlobDb namedBlobDb, ScheduledExecutorService executor, - AccountService accountService, long existingRows, long targetRows, int numThreads) throws Exception { + private static void prepareDatabaseForPerfTest(TestType testType, MetricRegistry registry, NamedBlobDb namedBlobDb, + DataSource dataSource, ScheduledExecutorService executor, AccountService accountService, PerfConfig config) + throws Exception { + // First, fill the database with target number of rows + long targetRows = config.targetRowsInMillion * 1000000L; + long existingRows = testType.getNumberOfExistingRows(dataSource); + if (existingRows >= targetRows) { + System.out.println("Existing number of rows: " + existingRows + ", more than target number of rows: " + targetRows + + ", skip filling database rows"); + return; + } + + int numThreads = config.parallelism; long remainingRows = targetRows - existingRows; System.out.println("Existing number of rows: " + existingRows + ". Target number of rows: " + targetRows + ". Number of rows to insert: " + remainingRows); @@ -342,7 +435,7 @@ private static void fillDatabase(MetricRegistry registry, NamedBlobDb namedBlobD if (i == numThreads - 1) { num = remainingRows - i * numberOfInsertPerWorker; } - futures.add(executor.submit(new RowFillWorker(i, namedBlobDb, accountService, num, trackingRow))); + futures.add(executor.submit(new RowFillWorker(i, namedBlobDb, testType, accountService, num, trackingRow))); } AtomicBoolean stop = new AtomicBoolean(false); executor.submit(() -> { @@ -376,46 +469,15 @@ private static void printHistogramMetric(MetricRegistry registry, String metricN System.out.println("99th: " + histogram.getSnapshot().get99thPercentile()); } - /** - * Generate a random {@link NamedBlobRecord}. - * @param random The {@link Random} object to generate random number. - * @param accountService The {@link AccountService} object. - * @param allAccounts All the accounts in the account service. - * @return A {@link NamedBlobRecord}. - */ - private static NamedBlobRecord generateRandomNamedBlobRecord(Random random, AccountService accountService, - List allAccounts) { - Account account; - Container container; - String blobName; - if (random.nextFloat() < PERCENTAGE_FOR_HUGE_LIST) { - account = accountService.getAccountById(HUGE_LIST_ACCOUNT_ID); - container = account.getContainerById(HUGE_LIST_CONTAINER_ID); - blobName = HUGE_LIST_COMMON_PREFIX + TestUtils.getRandomString(50); - } else { - account = allAccounts.get(random.nextInt(allAccounts.size())); - List containers = new ArrayList<>(account.getAllContainers()); - container = containers.get(random.nextInt(containers.size())); - blobName = TestUtils.getRandomString(50); - } - BlobId blobId = - new BlobId(BlobId.BLOB_ID_V6, BlobId.BlobIdType.NATIVE, (byte) 1, account.getId(), container.getId(), - PARTITION_ID, false, BlobId.BlobDataType.DATACHUNK); - NamedBlobRecord record = - NamedBlobRecord.forPut(account.getName(), container.getName(), blobName, blobId.toString(), Utils.Infinite_Time, - 1024); - return record; - } - /** * Worker class to insert rows into database. */ public static class RowFillWorker implements Runnable { private final int id; private final NamedBlobDb namedBlobDb; - private final AccountService accountService; private final List allAccounts; private final long numberOfInsert; + private final TestType testType; private final AtomicLong trackingRow; /** @@ -426,11 +488,11 @@ public static class RowFillWorker implements Runnable { * @param numberOfInsert The insert of rows to insert * @param trackingRow The {@link AtomicLong} object to keep track of how many rows are inserted. */ - public RowFillWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, long numberOfInsert, - AtomicLong trackingRow) { + public RowFillWorker(int id, NamedBlobDb namedBlobDb, TestType testType, AccountService accountService, + long numberOfInsert, AtomicLong trackingRow) { this.id = id; this.namedBlobDb = namedBlobDb; - this.accountService = accountService; + this.testType = testType; this.numberOfInsert = numberOfInsert; this.trackingRow = trackingRow; allAccounts = new ArrayList<>(accountService.getAllAccounts()); @@ -441,7 +503,7 @@ public void run() { ThreadLocalRandom random = ThreadLocalRandom.current(); try { for (long l = 0; l < numberOfInsert; l++) { - NamedBlobRecord record = generateRandomNamedBlobRecord(random, accountService, allAccounts); + NamedBlobRecord record = testType.generateNewNamedBlobRecord(random, allAccounts); namedBlobDb.put(record).get(); trackingRow.incrementAndGet(); } @@ -459,22 +521,24 @@ public void run() { * @param executor The {@link ScheduledExecutorService} object. * @param accountService The {@link AccountService} object. * @param numThreads The number of threads to run performance test - * @param includeList True to include list operation in the performance test + * @param config The {@link PerfConfig} object that contains properties for the test. * @throws Exception */ - private static void runPerformanceTest(MetricRegistry registry, NamedBlobDb namedBlobDb, - ScheduledExecutorService executor, AccountService accountService, int numThreads, boolean includeList) + private static void runPerformanceTest(MetricRegistry registry, NamedBlobDb namedBlobDb, TestType testType, + ScheduledExecutorService executor, AccountService accountService, int numThreads, PerfConfig config) throws Exception { - long numberOfPuts = 1000 * 1000; // 1 million inserts - System.out.println("Running performance test, number of puts: " + numberOfPuts); - long numberOfInsertPerWorker = numberOfPuts / numThreads; + int numberOfOperations = config.numOperations; + System.out.println("Running performance test, number of puts: " + numberOfOperations); + int numberOfInsertPerWorker = numberOfOperations / numThreads; List> futures = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { - long num = numberOfInsertPerWorker; + int num = numberOfInsertPerWorker; if (i == numThreads - 1) { - num = numberOfPuts - i * numberOfInsertPerWorker; + num = numberOfOperations - i * numberOfInsertPerWorker; } - futures.add(executor.submit(new PerformanceTestWorker(i, namedBlobDb, accountService, num, includeList))); + futures.add(executor.submit((PerformanceTestWorker) testType.getWorkerClass() + .getConstructor(int.class, NamedBlobDb.class, AccountService.class, int.class, PerfConfig.class) + .newInstance(i, namedBlobDb, accountService, num, config))); } for (Future future : futures) { future.get(); @@ -488,86 +552,273 @@ private static void runPerformanceTest(MetricRegistry registry, NamedBlobDb name } /** - * This class is the worker class that carries out the performance tests. - * There are several tests to be done in this worker - * 1. Put, with isUpsert to be true, otherwise, a put operation would run a select and insert query. - * 2. Update, update the state for each named blob record - * 3. Get, with all the named blob records that are just inserted - * 4. Delete, delete is actually an update as well. - * 5. List, list a small named blob sets and a huge named blob sets + * A base class for performance test worker */ - public static class PerformanceTestWorker implements Runnable { - private final int id; - private final NamedBlobDb namedBlobDb; - private final AccountService accountService; - private final List allAccounts; - private final long numberOfPuts; - private final boolean includeList; - private final List allRecords; - - public PerformanceTestWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, long numberOfPuts, - boolean includeList) { + public static abstract class PerformanceTestWorker implements Runnable { + protected final int id; + protected final NamedBlobDb namedBlobDb; + protected final AccountService accountService; + protected final List allAccounts; + protected final long numberOfOperations; + protected final PerfConfig config; + + public PerformanceTestWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, int numberOfOperations, + PerfConfig config) { this.id = id; this.namedBlobDb = namedBlobDb; this.accountService = accountService; - this.numberOfPuts = numberOfPuts; - this.includeList = includeList; + this.numberOfOperations = numberOfOperations; + this.config = config; allAccounts = new ArrayList<>(accountService.getAllAccounts()); - allRecords = new ArrayList<>((int) numberOfPuts); + } + } + + /** + * A performance worker class to do all point lookup operations for named blob, includin put, get, update and delete. + */ + public static class ReadWritePerformanceTestWorker extends PerformanceTestWorker { + private final List allRecords = new ArrayList<>(); + + public ReadWritePerformanceTestWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, + int numberOfOperations, PerfConfig config) { + super(id, namedBlobDb, accountService, numberOfOperations, config); } @Override public void run() { ThreadLocalRandom random = ThreadLocalRandom.current(); try { - for (long l = 0; l < numberOfPuts; l++) { - NamedBlobRecord record = generateRandomNamedBlobRecord(random, accountService, allAccounts); + for (long l = 0; l < numberOfOperations; l++) { + NamedBlobRecord record = ReadWritePerformanceTestWorker.generateNewNamedBlobRecord(random, allAccounts); namedBlobDb.put(record, NamedBlobState.IN_PROGRESS, true).get(); allRecords.add(record); } - System.out.println("PerformanceTestWorker " + id + " finishes writing " + numberOfPuts + " records"); + System.out.println( + "ReadWritePerformanceTestWorker " + id + " finishes writing " + numberOfOperations + " records"); for (NamedBlobRecord record : allRecords) { namedBlobDb.updateBlobTtlAndStateToReady(record).get(); } - System.out.println("PerformanceTestWorker " + id + " finishes updating " + numberOfPuts + " records"); + System.out.println( + "ReadWritePerformanceTestWorker " + id + " finishes updating " + numberOfOperations + " records"); for (NamedBlobRecord record : allRecords) { namedBlobDb.get(record.getAccountName(), record.getContainerName(), record.getBlobName()).get(); } - System.out.println("PerformanceTestWorker " + id + " finishes reading " + numberOfPuts + " records"); - - if (includeList) { - int numberOfList = 0; - for (NamedBlobRecord record : allRecords) { - if (!record.getAccountName().equals(String.format(ACCOUNT_NAME_FORMAT, HUGE_LIST_ACCOUNT_ID))) { - namedBlobDb.list(record.getAccountName(), record.getContainerName(), "A", null, null).get(); - numberOfList++; - if (numberOfList == 100) { - break; - } + System.out.println( + "ReadWritePerformanceTestWorker " + id + " finishes reading " + numberOfOperations + " records"); + + for (NamedBlobRecord record : allRecords) { + namedBlobDb.delete(record.getAccountName(), record.getContainerName(), record.getBlobName()).get(); + } + System.out.println( + "ReadWritePerformanceTestWorker " + id + " finishes deleting " + numberOfOperations + " records"); + } catch (Exception e) { + System.out.println("ReadWritePerformanceTestWorker " + id + " has som exception " + e); + } + } + + /** + * Get total number of rows of the target table. + * @param datasource Datasource to execute query on. + * @return Total number of rows + * @throws Exception + */ + public static long getNumberOfExistingRows(DataSource datasource) throws Exception { + String rowQuerySql = "SELECT COUNT(*) as total FROM " + TABLE_NAME; + long numberOfRows = 0; + try (Connection connection = datasource.getConnection()) { + try (PreparedStatement queryStatement = connection.prepareStatement(rowQuerySql)) { + try (ResultSet result = queryStatement.executeQuery()) { + while (result.next()) { + numberOfRows = result.getLong("total"); } } - System.out.println("PerformanceTestWorker " + id + " finishes listing for records"); + } + } + return numberOfRows; + } + + /** + * Generate a random {@link NamedBlobRecord} for ReadWritePerformanceTestWorker. + * @param random The {@link Random} object to generate random number. + * @param allAccounts All the accounts in the account service. + * @return A {@link NamedBlobRecord}. + */ + public static NamedBlobRecord generateNewNamedBlobRecord(Random random, List allAccounts) { + Account account = allAccounts.get(random.nextInt(allAccounts.size())); + List containers = new ArrayList<>(account.getAllContainers()); + Container container = containers.get(random.nextInt(containers.size())); + String blobName = TestUtils.getRandomString(50); + BlobId blobId = + new BlobId(BlobId.BLOB_ID_V6, BlobId.BlobIdType.NATIVE, (byte) 1, account.getId(), container.getId(), + PARTITION_ID, false, BlobId.BlobDataType.DATACHUNK); + NamedBlobRecord record = + new NamedBlobRecord(account.getName(), container.getName(), blobName, blobId.toString(), -1); + return record; + } + } + + /** + * A performance worker for list operation for named blob db. The worker thread would iterate over all the blob names + * with a give prefix under the given account and container. + */ + public static class ListPerformanceTestWorker extends PerformanceTestWorker { + + public ListPerformanceTestWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, + int numberOfOperations, PerfConfig config) { + super(id, namedBlobDb, accountService, numberOfOperations, config); + } - String accountName = String.format(ACCOUNT_NAME_FORMAT, HUGE_LIST_ACCOUNT_ID); - String containerName = String.format(CONTAINER_NAME_FORMAT, HUGE_LIST_CONTAINER_ID); + @Override + public void run() { + try { + for (int i = 0; i < numberOfOperations; i++) { + Page page = null; String token = null; - for (int i = 0; i < 100; i++) { - token = namedBlobDb.list(accountName, containerName, HUGE_LIST_COMMON_PREFIX, token, null) - .get() - .getNextPageToken(); + do { + page = + namedBlobDb.list(HUGE_LIST_ACCOUNT_NAME, HUGE_LIST_CONTAINER_NAME, HUGE_LIST_COMMON_PREFIX, token, null) + .get(); + token = page.getNextPageToken(); + } while (token != null); + if (i % 100 == 0) { + System.out.println("ListPerformanceTestWorker " + id + " finishes " + i + " operations"); } - System.out.println("PerformanceTestWorker " + id + " finishes listing for huge records"); } + } catch (Exception e) { + System.out.println("ListPerformanceTestWorker " + id + " has som exception " + e); + } + } - for (NamedBlobRecord record : allRecords) { + /** + * Get total number of rows for that starts with the list prefix under the given account and container. + * @param datasource Datasource to execute query on. + * @return Total number of rows + * @throws Exception + */ + public static long getNumberOfExistingRows(DataSource datasource) throws Exception { + String rowQuerySql = "SELECT COUNT(*) as total FROM " + TABLE_NAME + + " WHERE account_id = ? And container_id = ? and BLOB_NAME like ?"; + long numberOfRows = 0; + try (Connection connection = datasource.getConnection()) { + try (PreparedStatement queryStatement = connection.prepareStatement(rowQuerySql)) { + queryStatement.setInt(1, HUGE_LIST_ACCOUNT_ID); + queryStatement.setInt(2, HUGE_LIST_CONTAINER_ID); + queryStatement.setString(3, HUGE_LIST_COMMON_PREFIX + "%"); + try (ResultSet result = queryStatement.executeQuery()) { + while (result.next()) { + numberOfRows = result.getLong("total"); + } + } + } + } + return numberOfRows; + } + + /** + * Generate a random {@link NamedBlobRecord} for list operation. + * @param random The {@link Random} object to generate random number. + * @param allAccounts All the accounts in the account service. + * @return A {@link NamedBlobRecord}. + */ + public static NamedBlobRecord generateNewNamedBlobRecord(Random random, List allAccounts) { + String blobName = String.format(HUGE_LIST_TEMPLATE, UUID.randomUUID(), TestUtils.getRandomString(32)); + BlobId blobId = new BlobId(BlobId.BLOB_ID_V6, BlobId.BlobIdType.NATIVE, (byte) 1, HUGE_LIST_ACCOUNT_ID, + HUGE_LIST_CONTAINER_ID, PARTITION_ID, false, BlobId.BlobDataType.DATACHUNK); + NamedBlobRecord record = + new NamedBlobRecord(HUGE_LIST_ACCOUNT_NAME, HUGE_LIST_CONTAINER_NAME, blobName, blobId.toString(), -1); + return record; + } + } + + /** + * A performance worker for custom operation set for named blob db. This worker does a set of operations as you defined + * in the method. + */ + public static class CustomPerformanceTestWorker extends PerformanceTestWorker { + private final boolean onlyWrites; + + public CustomPerformanceTestWorker(int id, NamedBlobDb namedBlobDb, AccountService accountService, + int numberOfOperations, PerfConfig config) { + super(id, namedBlobDb, accountService, numberOfOperations, config); + this.onlyWrites = config.onlyWrites; + } + + @Override + public void run() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + for (long l = 1; l <= numberOfOperations; l++) { + NamedBlobRecord record = CustomPerformanceTestWorker.generateNewNamedBlobRecord(random, allAccounts); + if (!onlyWrites) { + try { + namedBlobDb.get(record.getAccountName(), record.getContainerName(), record.getBlobName()).get(); + } catch (Exception e) { + // expected NOT_FOUND failure + } + try { + namedBlobDb.get(record.getAccountName(), record.getContainerName(), record.getBlobName() + "/").get(); + } catch (Exception e) { + // expected NOT_FOUND failure + } + + try { + namedBlobDb.list(record.getAccountName(), record.getContainerName(), record.getBlobName() + "/", null, + null); + } catch (Exception e) { + // expected NOT_FOUND failure + } + } + PutResult putResult = namedBlobDb.put(record, NamedBlobState.IN_PROGRESS, true).get(); + // Get the updated version + record = putResult.getInsertedRecord(); + namedBlobDb.updateBlobTtlAndStateToReady(record).get(); + if (!onlyWrites) { + // Get blob again + namedBlobDb.get(record.getAccountName(), record.getContainerName(), record.getBlobName()).get(); + } + // Now delete namedBlobDb.delete(record.getAccountName(), record.getContainerName(), record.getBlobName()).get(); + if (l % 100 == 0) { + System.out.println("CustomPerformanceTestWorker " + id + " finishes " + l + " records"); + } } - System.out.println("PerformanceTestWorker " + id + " finishes deleting " + numberOfPuts + " records"); + System.out.println("CustomPerformanceTestWorker " + id + " finishes " + numberOfOperations + " records"); } catch (Exception e) { - System.out.println("PerformanceTestWorker " + id + " has som exception " + e); + System.out.println("CustomPerformanceTestWorker " + id + " has som exception " + e); } } + + /** + * Just return 0 here. + * @param datasource + * @return + * @throws Exception + */ + public static long getNumberOfExistingRows(DataSource datasource) throws Exception { + return 0L; + } + + /** + * Generate a random {@link NamedBlobRecord} for custom operation. + * @param random The {@link Random} object to generate random number. + * @param allAccounts All the accounts in the account service. + * @return A {@link NamedBlobRecord}. + */ + public static NamedBlobRecord generateNewNamedBlobRecord(Random random, List allAccounts) { + Account account = allAccounts.get(random.nextInt(allAccounts.size())); + List containers = new ArrayList<>(account.getAllContainers()); + Container container = containers.get(random.nextInt(containers.size())); + String blobName = + String.format("checkpoints/%s/chk-900/%s", TestUtils.getRandomString(32), UUID.randomUUID().toString()); + BlobId blobId = + new BlobId(BlobId.BLOB_ID_V6, BlobId.BlobIdType.NATIVE, (byte) 1, account.getId(), container.getId(), + PARTITION_ID, false, BlobId.BlobDataType.DATACHUNK); + long expirationTime = Utils.addSecondsToEpochTime(System.currentTimeMillis() / 1000, TimeUnit.DAYS.toSeconds(2)); + NamedBlobRecord record = + new NamedBlobRecord(account.getName(), container.getName(), blobName, blobId.toString(), expirationTime); + return record; + } } } diff --git a/ambry-tools/src/test/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerfTest.java b/ambry-tools/src/test/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerfTest.java new file mode 100644 index 0000000000..5c015fd10a --- /dev/null +++ b/ambry-tools/src/test/java/com/github/ambry/tools/perf/NamedBlobMysqlDatabasePerfTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.tools.perf; + +import com.github.ambry.account.AccountService; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.named.NamedBlobDb; +import java.util.Collections; +import java.util.Properties; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class NamedBlobMysqlDatabasePerfTest { + @Test + public void testTestTypeConstructor() throws Exception { + AccountService accountService = mock(AccountService.class); + when(accountService.getAllAccounts()).thenReturn(Collections.emptyList()); + NamedBlobDb db = mock(NamedBlobDb.class); + Properties props = new Properties(); + props.setProperty("db.datacenter", "datacenter"); + props.setProperty("db.host", "host"); + props.setProperty("db.name", "name"); + props.setProperty("db.user.name", "name"); + props.setProperty("parallelism", "10"); + props.setProperty("num.operations", "10"); + NamedBlobMysqlDatabasePerf.PerfConfig perfConfig = + new NamedBlobMysqlDatabasePerf.PerfConfig(new VerifiableProperties(props)); + + NamedBlobMysqlDatabasePerf.PerformanceTestWorker worker = + (NamedBlobMysqlDatabasePerf.PerformanceTestWorker) NamedBlobMysqlDatabasePerf.TestType.LIST.getWorkerClass() + .getConstructor(int.class, NamedBlobDb.class, AccountService.class, int.class, + NamedBlobMysqlDatabasePerf.PerfConfig.class) + .newInstance(0, db, accountService, 10, perfConfig); + Assert.assertNotNull(worker); + } +}