From 5dfc20d84c71b07699160c44a8200dbd5b5655b9 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 27 Mar 2024 10:49:49 +0800 Subject: [PATCH 1/4] [Feature] add BaseMetricMonitor for bookie --- .../apache/bookkeeper/bookie/BaseMetric.java | 78 +++ .../bookkeeper/bookie/BaseMetricMonitor.java | 585 ++++++++++++++++++ .../bookie/BookKeeperServerStats.java | 7 + .../org/apache/bookkeeper/bookie/Bookie.java | 2 + .../apache/bookkeeper/bookie/BookieImpl.java | 36 ++ .../bookie/stats/BaseMetricMonitorStats.java | 73 +++ .../bookkeeper/conf/ServerConfiguration.java | 83 +++ 7 files changed, 864 insertions(+) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetric.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetricMonitor.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BaseMetricMonitorStats.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetric.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetric.java new file mode 100644 index 00000000000..7be01a97443 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetric.java @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +public class BaseMetric { + int journalIoUtil; + int ledgerIoUtil; + int cpuUsedRate; + long writeBytePerSecond; + + BaseMetric() { + journalIoUtil = -1; + ledgerIoUtil = -1; + cpuUsedRate = -1; + writeBytePerSecond = -1; + } + + public int getJournalIoUtil() { + return journalIoUtil; + } + + public void setJournalIoUtil(int journalIoUtil) { + this.journalIoUtil = journalIoUtil; + } + + public int getLedgerIoUtil() { + return ledgerIoUtil; + } + + public void setLedgerIoUtil(int ledgerIoUtil) { + this.ledgerIoUtil = ledgerIoUtil; + } + + public int getCpuUsedRate() { + return cpuUsedRate; + } + + public void setCpuUsedRate(int cpuUsedRate) { + this.cpuUsedRate = cpuUsedRate; + } + + public long getWriteBytePerSecond() { + return writeBytePerSecond; + } + + public void setWriteBytePerSecond(long writeBytePerSecond) { + this.writeBytePerSecond = writeBytePerSecond; + } + + @Override + public String toString() { + return "BaseMetric{" + + "journalIoUtil=" + journalIoUtil + + ", ledgerIoUtil=" + ledgerIoUtil + + ", cpuUsedRate=" + cpuUsedRate + + ", writeBytePerSecond=" + writeBytePerSecond + + '}'; + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetricMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetricMonitor.java new file mode 100644 index 00000000000..d2e5b602765 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BaseMetricMonitor.java @@ -0,0 +1,585 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.bookkeeper.bookie.stats.BaseMetricMonitorStats; +import org.apache.bookkeeper.bookie.stats.BookieStats; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.nio.charset.Charset; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Thread to monitor the journal disk periodically. + */ +public class BaseMetricMonitor { + public static final Logger LOG = LoggerFactory.getLogger(BaseMetricMonitor.class); + + private final static int INVALID = -1; + private final long intervalMs; + private final int metricSlideWindowSize; + private int curSliceIdx = 0; + private final Map> journalIoStats = new HashMap<>(); + private final Map> ledgerIoStats = new HashMap<>(); + private final Map cpuStats = new HashMap<>(); + private final WriteByteStat writeByteStat; + private final BaseMetricMonitorStats baseMetricMonitorStats; + private ScheduledExecutorService executor; + private ScheduledFuture checkTask; + private final BookieStats bookieStats; + private int updateBaseMetricAfterUpdateStatCount; + private boolean reportStatByDelta; + private final BaseMetric baseMetric = new BaseMetric(); + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + + private class DiskLocation { + + private final String diskName; + private final File location; + DiskLocation(File location) throws IOException { + this.location = location; + FileStore fs = Files.getFileStore(location.toPath()); + this.diskName = Paths.get(fs.name()).getFileName().toString(); + } + + public String getDiskName() { + return diskName; + } + + @Override + public String toString() { + return location.toString() + " disk: " + diskName; + } + @Override + public int hashCode() { + return location.hashCode(); + } + } + + private class DiskIOStat { + private long lastTotalTicks; + private long lastStatTime; + private int util; + + public int getUtil() { + return util; + } + + public void setUtil(int util) { + if (util == INVALID) { + this.util = util; + return; + } + if (util <= 100 && util >= 0) { + this.util = util; + } else if (util < 0) { + this.util = 0; + } else { + this.util = 100; + } + } + + public long getLastTotalTicks() { + return lastTotalTicks; + } + + public void setLastTotalTicks(long lastTotalTicks) { + this.lastTotalTicks = lastTotalTicks; + } + + public long getLastStatTime() { + return lastStatTime; + } + + public void setLastStatTime(long lastStatTime) { + this.lastStatTime = lastStatTime; + } + } + + private class CPUStat { + private long lastFreeTime; + private long lastTotalTime; + private int cpuUsedRate; + + public long getLastFreeTime() { + return lastFreeTime; + } + + public void setLastFreeTime(long lastFreeTime) { + this.lastFreeTime = lastFreeTime; + } + + public long getLastTotalTime() { + return lastTotalTime; + } + + public void setLastTotalTime(long lastTotalTime) { + this.lastTotalTime = lastTotalTime; + } + + public int getCpuUsedRate() { + return cpuUsedRate; + } + + public void setCpuUsedRate(int cpuUsedRate) { + if (cpuUsedRate == INVALID) { + this.cpuUsedRate = INVALID; + return; + } + if (cpuUsedRate <= 100 && cpuUsedRate >= 0) { + this.cpuUsedRate = cpuUsedRate; + } else if (cpuUsedRate < 0) { + this.cpuUsedRate = 0; + } else { + this.cpuUsedRate = 100; + } + } + } + + private class WriteByteStat { + private long lastWriteByte; + private long writeBytePerSecond; + private long lastStatTime; + + public long getLastWriteByte() { + return lastWriteByte; + } + + public void setLastWriteByte(long lastWriteByte) { + this.lastWriteByte = lastWriteByte; + } + + public long getWriteBytePerSecond() { + return writeBytePerSecond; + } + + public void setWriteBytePerSecond(long writeBytePerSecond) { + this.writeBytePerSecond = writeBytePerSecond; + } + + public long getLastStatTime() { + return lastStatTime; + } + + public void setLastStatTime(long lastStatTime) { + this.lastStatTime = lastStatTime; + } + } + + BaseMetricMonitor(ServerConfiguration conf, List journalDirectories, + List ledgerDirectories, StatsLogger statsLogger, BookieStats bookieStats) throws IOException { + if (!SystemUtils.IS_OS_LINUX) { + String msg = "BaseMetricMonitor does not start, only Linux OS release support!"; + LOG.info(msg); + throw new IOException(msg); + } + this.intervalMs = conf.getBaseMetricMonitorStatIntervalMs(); + this.metricSlideWindowSize = conf.getBaseMetricMonitorMetricSlideWindowSize(); + this.updateBaseMetricAfterUpdateStatCount = metricSlideWindowSize; + this.bookieStats = bookieStats; + + initIoStat(journalIoStats, journalDirectories); + LOG.info("BaseMetricMonitor adds ioStat for journal disk {}", + journalIoStats.keySet().stream().map(DiskLocation::getDiskName).collect(Collectors.toList())); + + initIoStat(ledgerIoStats, ledgerDirectories); + LOG.info("BaseMetricMonitor adds ioStat for ledger disk {}", + ledgerIoStats.keySet().stream().map(DiskLocation::getDiskName).collect(Collectors.toList())); + + try { + getUsedCPU(cpuStats); + LOG.info("BaseMetricMonitor adds cpuStat for cpu {}", new ArrayList<>(cpuStats.keySet())); + } catch (IOException e) { + LOG.warn("BaseMetricMonitor getUsedCPU failed", e); + } + + writeByteStat = new WriteByteStat(); + // expose stats + this.baseMetricMonitorStats = new BaseMetricMonitorStats(statsLogger); + } + + private void initIoStat(Map> ioStats, List directories) { + if (directories != null) { + for (File dir : directories) { + DiskLocation diskLocation = null; + try { + diskLocation = new DiskLocation(dir); + } catch (IOException e) { + LOG.warn("BaseMetricMonitor initIoStat can't find corresponding diskLocation for dir {}", dir); + } + List diskIOStats = new ArrayList<>(); + for (int i = 0; i < metricSlideWindowSize; i++) { + diskIOStats.add(new DiskIOStat()); + } + ioStats.put(diskLocation, diskIOStats); + } + } + } + + // start the daemon for disk monitoring + public void start() { + LOG.info("Starting BaseMetricMonitor, schedule at fixed rate {}ms", intervalMs); + this.executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("BaseMetricMonitorThread") + .setDaemon(true) + .build()); + this.checkTask = this.executor.scheduleAtFixedRate(this::updateStat, intervalMs, intervalMs, TimeUnit.MILLISECONDS); + } + + // shutdown disk monitoring daemon + public void shutdown() { + LOG.info("Shutting down BaseMetricMonitor"); + if (null != checkTask) { + if (!checkTask.cancel(true)) { + LOG.debug("Failed to cancel check task in BaseMetricMonitor"); + } + } + if (null != executor) { + executor.shutdown(); + } + } + + private void updateStat() { + Map allIoStats = getDiskIoStat(); + Map allCpuStats = getCPUStat(); + DiskIOStat journalIoStat = updateDiskIoStat(allIoStats, journalIoStats, curSliceIdx); + DiskIOStat ledgerIoStat = updateDiskIoStat(allIoStats, ledgerIoStats, curSliceIdx); + CPUStat cpuStat = updateCpuStat(allCpuStats); + long writeBytePerSecond = updateWriteByteStat(); + curSliceIdx = (curSliceIdx + 1) % metricSlideWindowSize; + + // because some base metric need at least one slide round, update base metric after first slide round + if (updateBaseMetricAfterUpdateStatCount > 0) { + updateBaseMetricAfterUpdateStatCount -= 1; + return; + } + + String oldBaseMetric = baseMetric.toString(); + rwLock.writeLock().lock(); + baseMetric.setJournalIoUtil(journalIoStat.getUtil()); + baseMetric.setLedgerIoUtil(ledgerIoStat.getUtil()); + baseMetric.setCpuUsedRate(cpuStat.getCpuUsedRate()); + baseMetric.setWriteBytePerSecond(writeBytePerSecond); + rwLock.writeLock().unlock(); + reportStat(baseMetric); + LOG.debug("BaseMetricMonitor updates base metric from {} to {}", oldBaseMetric, baseMetric); + } + + private void reportStat(BaseMetric curBaseMetric) { + // because counter doesn't have set function, use counter add function instead + int deltaJournalIoUtil, deltaLedgerIoUtil, deltaCpuUsedRate; + long deltaWriteBytePerSecond; + if (!reportStatByDelta) { + deltaJournalIoUtil = curBaseMetric.getJournalIoUtil(); + deltaLedgerIoUtil = curBaseMetric.getLedgerIoUtil(); + deltaCpuUsedRate = curBaseMetric.getCpuUsedRate(); + deltaWriteBytePerSecond = curBaseMetric.getWriteBytePerSecond(); + reportStatByDelta = true; + } else { + deltaJournalIoUtil = curBaseMetric.getJournalIoUtil() - baseMetric.getJournalIoUtil(); + deltaLedgerIoUtil = curBaseMetric.getLedgerIoUtil() - baseMetric.getLedgerIoUtil(); + deltaCpuUsedRate = curBaseMetric.getCpuUsedRate() - baseMetric.getCpuUsedRate(); + deltaWriteBytePerSecond = curBaseMetric.getWriteBytePerSecond() - baseMetric.getWriteBytePerSecond(); + } + baseMetricMonitorStats.getJournalIoUtil().addCount(deltaJournalIoUtil); + baseMetricMonitorStats.getLedgerIoUtil().addCount(deltaLedgerIoUtil); + baseMetricMonitorStats.getCpuUsedRate().addCount(deltaCpuUsedRate); + baseMetricMonitorStats.getWriteBytePerSecond().addCount(deltaWriteBytePerSecond); + } + + public BaseMetric getBaseMetric() { + BaseMetric ret = new BaseMetric(); + rwLock.readLock().lock(); + ret.setJournalIoUtil(baseMetric.getJournalIoUtil()); + ret.setLedgerIoUtil(baseMetric.getLedgerIoUtil()); + ret.setCpuUsedRate(baseMetric.getCpuUsedRate()); + ret.setWriteBytePerSecond(baseMetric.getWriteBytePerSecond()); + rwLock.readLock().unlock(); + return ret; + } + + private DiskIOStat updateDiskIoStat(Map allIoStats, + Map> ioStats, int curIdx) { + int totalIoUtil = 0; + List existDisks = new LinkedList<>(); + for (Map.Entry> entry : ioStats.entrySet()) { + String diskName = entry.getKey().diskName; + DiskIOStat oldStat = entry.getValue().get(curIdx); + if (allIoStats.containsKey(diskName)) { + long oldTotalTicks = oldStat.getLastTotalTicks(); + long oldStatTime = oldStat.getLastStatTime(); + DiskIOStat newStat = allIoStats.get(diskName); + long newTotalTicks = newStat.getLastTotalTicks(); + long newStatTime = newStat.getLastStatTime(); + oldStat.setLastTotalTicks(newTotalTicks); + oldStat.setLastStatTime(newStatTime); + if (oldTotalTicks != 0 && newStatTime > oldStatTime) { + int util = (int) ((double) (newTotalTicks - oldTotalTicks) * 100 / (newStatTime - oldStatTime)); + oldStat.setUtil(util); + totalIoUtil += util; + existDisks.add(diskName); + LOG.debug("{} disk io util:{}", diskName, util); + continue; + } + } + // maybe this disk has been umounted. + // may consider again which value is fair, 100 or -1 + oldStat.setUtil(INVALID); + } + DiskIOStat ret = new DiskIOStat(); + if (!existDisks.isEmpty()) { + // may consider again which stat is fair, average or maximum + ret.setUtil(totalIoUtil / existDisks.size()); + } else { + ret.setUtil(INVALID); + } + return ret; + } + + private static final String PROC_DISKSSTATS = "/proc/diskstats"; + private static final Pattern DISK_STAT_FORMAT = + Pattern.compile("[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*(\\S*)" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*([0-9]*)[ \t].*"); + + private Map getDiskIoStat() { + Map rets = new HashMap<>(); + InputStreamReader fReader = null; + BufferedReader in = null; + try { + fReader = new InputStreamReader( + new FileInputStream(PROC_DISKSSTATS), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("BaseMetricMonitor gets FileNotFoundException while getDiskIoStat", f); + return rets; + } + try { + Matcher mat = null; + String str = in.readLine(); + long statTime = System.currentTimeMillis(); + while (str != null) { + mat = DISK_STAT_FORMAT.matcher(str); + if (mat.find()) { + String diskName = mat.group(1); + long totalTicks = Long.parseLong(mat.group(2)); + LOG.debug("{} totalTicks:{}", str, totalTicks); + DiskIOStat stat = new DiskIOStat(); + stat.setLastTotalTicks(totalTicks); + stat.setLastStatTime(statTime); + rets.put(diskName, stat); + } + str = in.readLine(); + statTime = System.currentTimeMillis(); + } + } catch (IOException e) { + LOG.warn("BaseMetricMonitor gets exception while getDiskIoStat", e); + } + return rets; + } + + private static final String PROC = "/proc/%s/status"; + private static final Pattern PID_STATUS_CPU_FORMAT = + Pattern.compile("Cpus_allowed_list:[ \t]*(\\S+)"); + private static final String CPU = "cpu"; + + private String getPid() { + String name = ManagementFactory.getRuntimeMXBean().getName(); + String[] split = name.split("@"); + return split[0]; + } + + private void getUsedCPU(Map cpuStats) throws IOException { + InputStreamReader fReader = null; + BufferedReader in = null; + try { + fReader = new InputStreamReader( + new FileInputStream(String.format(PROC, getPid())), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("BaseMetricMonitor gets FileNotFoundException while getUsedCPU", f); + throw f; + } + try { + Matcher mat = null; + String str = in.readLine(); + while (str != null) { + mat = PID_STATUS_CPU_FORMAT.matcher(str); + if (mat.find()) { + String cpu = mat.group(1); + LOG.debug("{} cpuName:{}", str, cpu); + String[] splits = cpu.split(","); + for (int i = 0; i < splits.length; i++) { + String subSplits = splits[i]; + String[] subSplit = subSplits.split("-"); + if (subSplit.length == 1) { + String cpuName = CPU + subSplit[0]; + cpuStats.put(cpuName, new CPUStat()); + } else if (subSplit.length == 2) { + int startIdx = Integer.parseInt(subSplit[0]); + int endIdx = Integer.parseInt(subSplit[1]); + for (int j = startIdx; j <= endIdx; j++) { + String cpuName = CPU + j; + cpuStats.put(cpuName, new CPUStat()); + } + } + } + break; + } + str = in.readLine(); + } + } catch (IOException e) { + LOG.warn("BaseMetricMonitor gets exception while getUsedCPU", e); + throw e; + } + if (cpuStats.isEmpty()) { + cpuStats.put(CPU, new CPUStat()); + } + } + private static final String PROC_STAT = "/proc/stat"; + private static final Pattern PROC_STAT_CPU_FORMAT = + Pattern.compile("(cpu[0-9]*)[ \t]+([0-9]+)[ \t]+([0-9]+)[ \t]+([0-9]+)" + + "[ \t]+([0-9]+)[ \t]+([0-9]+)"); + + private Map getCPUStat() { + Map rets = new HashMap<>(); + InputStreamReader fReader = null; + BufferedReader in = null; + try { + fReader = new InputStreamReader( + new FileInputStream(PROC_STAT), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("BaseMetricMonitor gets FileNotFoundException while getCPUStat", f); + return rets; + } + try { + Matcher mat = null; + String str = in.readLine(); + while (str != null) { + mat = PROC_STAT_CPU_FORMAT.matcher(str); + if (mat.find()) { + long totalTime = 0; + String cpuName = mat.group(1); + for (int i = 2; i <= mat.groupCount(); i++) { + totalTime += Long.parseLong(mat.group(i)); + } + long freeTime = Long.parseLong(mat.group(5)); + LOG.debug("{} cpuName:{} totalTime:{} freeTime:{}", str, cpuName, totalTime, freeTime); + CPUStat stat = new CPUStat(); + stat.setLastTotalTime(totalTime); + stat.setLastFreeTime(freeTime); + rets.put(cpuName, stat); + } + str = in.readLine(); + } + } catch (IOException e) { + LOG.warn("BaseMetricMonitor gets exception while getCPUStat", e); + } + return rets; + } + + private CPUStat updateCpuStat(Map allCpuStats) { + int totalCpuUsedRate = 0; + int existCpuNum = 0; + for (Map.Entry entry : cpuStats.entrySet()) { + String cpuName = entry.getKey(); + CPUStat oldStat = entry.getValue(); + if (allCpuStats.containsKey(cpuName)) { + long oldTotalTime = oldStat.getLastTotalTime(); + long oldFreeTime = oldStat.getLastFreeTime(); + CPUStat newStat = allCpuStats.get(cpuName); + long newTotalTime = newStat.getLastTotalTime(); + long newFreeTime = newStat.getLastFreeTime(); + oldStat.setLastFreeTime(newFreeTime); + oldStat.setLastTotalTime(newTotalTime); + if (oldTotalTime != 0 && newTotalTime > oldTotalTime) { + int cpuUsedRate = (int) (100 - ((newFreeTime - oldFreeTime) * 100 / (newTotalTime - oldTotalTime))); + oldStat.setCpuUsedRate(cpuUsedRate); + totalCpuUsedRate += cpuUsedRate; + ++existCpuNum; + LOG.debug("{} cpu used rate:{}", cpuName, cpuUsedRate); + continue; + } + } + // may consider again which value is fair, 100 or -1 + oldStat.setCpuUsedRate(INVALID); + } + CPUStat ret = new CPUStat(); + if (existCpuNum > 0) { + // may consider again which stat is fair, average or maximum + ret.setCpuUsedRate(totalCpuUsedRate / existCpuNum); + } else { + ret.setCpuUsedRate(INVALID); + } + return ret; + } + + private long updateWriteByteStat() { + long writeBytePerSecond = 0; + long oldWriteByte = writeByteStat.getLastWriteByte(); + long oldStatTime = writeByteStat.getLastStatTime(); + long newWriteByte = bookieStats.getWriteBytes().get(); + long newStatTime = System.currentTimeMillis(); + if (oldWriteByte != 0 && newStatTime > oldStatTime) { + writeBytePerSecond = (newWriteByte - oldWriteByte) * TimeUnit.SECONDS.toMillis(1) / (newStatTime - oldStatTime); + } + writeByteStat.setLastWriteByte(newWriteByte); + writeByteStat.setLastStatTime(newStatTime); + writeByteStat.setWriteBytePerSecond(writeBytePerSecond); + LOG.debug("write byte per second:{}", writeBytePerSecond); + return writeBytePerSecond; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 59ffd99e0de..f60428c155b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -190,4 +190,11 @@ public interface BookKeeperServerStats { String NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE = "NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE"; String NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS = "NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS"; String ENTRYLOGS_PER_LEDGER = "ENTRYLOGS_PER_LEDGER"; + + // BaseMetricMonitor Stats + String BASE_METRIC_MONITOR_SCOPE = "base_metric_monitor"; + String JOURNAL_IO_UTIL = "JOURNAL_IO_UTIL"; + String LEDGER_IO_UTIL = "LEDGER_IO_UTIL"; + String CPU_USED_RATE = "CPU_USED_RATE"; + String WRITE_BYTES_PER_SECOND = "WRITE_BYTES_PER_SECOND"; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 90c8acf5af4..f1f50671000 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -54,6 +54,8 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte long getTotalDiskSpace() throws IOException; long getTotalFreeSpace() throws IOException; + BaseMetric getBaseMetric(); + // TODO: Shouldn't this be async? ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException, BookieException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index a660a13ce84..8df906f6471 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.bookie; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BASE_METRIC_MONITOR_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; @@ -129,6 +130,12 @@ public class BookieImpl implements Bookie { private final boolean writeDataToJournal; + private boolean baseMetricMonitorEnabled; + + BaseMetricMonitor baseMetricMonitor; + + private static final BaseMetric DEFAULT_BASE_METRIC = new BaseMetric(); + // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @Override @@ -318,6 +325,13 @@ public long getTotalFreeSpace() throws IOException { return getLedgerDirsManager().getTotalFreeSpace(ledgerDirsManager.getAllLedgerDirs()); } + public BaseMetric getBaseMetric() { + if (!baseMetricMonitorEnabled || baseMetricMonitor == null) { + return DEFAULT_BASE_METRIC; + } + return baseMetricMonitor.getBaseMetric(); + } + public static File getCurrentDirectory(File dir) { return new File(dir, BookKeeperConstants.CURRENT_DIR); } @@ -495,6 +509,10 @@ public void ledgerDeleted(long ledgerId) { // Expose Stats this.bookieStats = new BookieStats(statsLogger, journalDirectories.size(), conf.getJournalQueueSize()); + this.baseMetricMonitorEnabled = conf.isBaseMetricMonitorEnabled(); + if (this.baseMetricMonitorEnabled) { + initBaseMetricMonitor(statsLogger.scope(BASE_METRIC_MONITOR_SCOPE)); + } } @VisibleForTesting @@ -668,6 +686,10 @@ public synchronized void start() { //Start DiskChecker thread dirsMonitor.start(); + if (baseMetricMonitorEnabled && baseMetricMonitor != null) { + baseMetricMonitor.start(); + } + // replay journals try { readJournal(); @@ -905,6 +927,10 @@ int shutdown(int exitCode) { //Shutdown disk checker dirsMonitor.shutdown(); + + if (baseMetricMonitorEnabled && baseMetricMonitor != null) { + baseMetricMonitor.shutdown(); + } } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1321,4 +1347,14 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg public List getJournals() { return this.journals; } + + + private void initBaseMetricMonitor(StatsLogger statsLogger) { + try { + this.baseMetricMonitor = new BaseMetricMonitor(conf, journalDirectories, + ledgerDirsManager.getAllLedgerDirs(), statsLogger, bookieStats); + } catch (IOException e) { + LOG.warn("Bookie gets exception while initBaseMetricMonitor", e); + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BaseMetricMonitorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BaseMetricMonitorStats.java new file mode 100644 index 00000000000..5848645cc46 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BaseMetricMonitorStats.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.bookie.stats; + +import lombok.Getter; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.annotations.StatsDoc; + +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BASE_METRIC_MONITOR_SCOPE; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CPU_USED_RATE; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_IO_UTIL; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_IO_UTIL; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES_PER_SECOND; + + +@StatsDoc( + name = BASE_METRIC_MONITOR_SCOPE, + category = CATEGORY_SERVER, + help = "Base metric monitor stats" +) +@Getter +public class BaseMetricMonitorStats { + + @StatsDoc( + name = JOURNAL_IO_UTIL, + help = "Journal io util" + ) + private final Counter journalIoUtil; + + @StatsDoc( + name = LEDGER_IO_UTIL, + help = "ledger io util" + ) + private final Counter ledgerIoUtil; + + @StatsDoc( + name = CPU_USED_RATE, + help = "cpu used rate" + ) + private final Counter cpuUsedRate; + + @StatsDoc( + name = WRITE_BYTES_PER_SECOND, + help = "write byte per second" + ) + private final Counter writeBytePerSecond; + + public BaseMetricMonitorStats(StatsLogger statsLogger) { + journalIoUtil = statsLogger.getCounter(JOURNAL_IO_UTIL); + ledgerIoUtil = statsLogger.getCounter(LEDGER_IO_UTIL); + cpuUsedRate = statsLogger.getCounter(CPU_USED_RATE); + writeBytePerSecond = statsLogger.getCounter(WRITE_BYTES_PER_SECOND); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 5b12a8f9e43..e4da3651bc0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -347,6 +347,18 @@ public class ServerConfiguration extends AbstractConfiguration Date: Wed, 27 Mar 2024 10:50:45 +0800 Subject: [PATCH 2/4] [Feature] update GetBookieInfo for bookie client --- .../src/main/proto/BookkeeperProtocol.proto | 4 ++ .../bookkeeper/client/BookieInfoReader.java | 39 ++++++++++++++++++- .../proto/GetBookieInfoProcessorV3.java | 17 ++++++-- .../proto/PerChannelBookieClient.java | 24 ++++++++++-- 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto index 72df7d5e1d4..e8e3dc2024e 100644 --- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto +++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto @@ -220,6 +220,10 @@ message GetBookieInfoResponse { required StatusCode status = 1; optional int64 totalDiskCapacity = 2; optional int64 freeDiskSpace = 3; + optional int32 journalIoUtil = 4; + optional int32 ledgerIoUtil = 5; + optional int32 cpuUsedRate = 6; + optional int64 writeBytePerSecond = 7; } message GetListOfEntriesOfLedgerResponse { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java index ea6dea0904e..12e4f90db30 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java @@ -62,6 +62,13 @@ public class BookieInfoReader { public static class BookieInfo implements WeightedObject { private final long freeDiskSpace; private final long totalDiskSpace; + private int journalIoUtil = -1; + private int ledgerIoUtil = -1; + private int cpuUsedRate = -1; + private long writeBytePerSecond = -1L; + private boolean hasLoadInfo = false; + private long loadWeight = -1; + public BookieInfo() { this(0L, 0L); } @@ -69,6 +76,26 @@ public BookieInfo(long totalDiskSpace, long freeDiskSpace) { this.totalDiskSpace = totalDiskSpace; this.freeDiskSpace = freeDiskSpace; } + + public BookieInfo(long totalDiskSpace, long freeDiskSpace, int journalIoUtil, + int ledgerIoUtil, int cpuUsedRate, long writeBytePerSecond) { + this.totalDiskSpace = totalDiskSpace; + this.freeDiskSpace = freeDiskSpace; + this.journalIoUtil = journalIoUtil; + this.ledgerIoUtil = ledgerIoUtil; + this.cpuUsedRate = cpuUsedRate; + this.writeBytePerSecond = writeBytePerSecond; + this.hasLoadInfo = true; + + // calculate the load weight in BookieInfo constructor. + // Thus, we do not need to calculate in getWeight() each time. + + // currently, consider writeBytePerSecond as bookie's weight. + if (writeBytePerSecond >= 0) { + long writeByte1MB = 1024 * 1024; + loadWeight = writeBytePerSecond / writeByte1MB; + } + } public long getFreeDiskSpace() { return freeDiskSpace; } @@ -76,12 +103,22 @@ public long getTotalDiskSpace() { return totalDiskSpace; } @Override + public int getLoad() { + // currently, consider journal IOUtil as bookie's load + return journalIoUtil; + } + @Override public long getWeight() { + if (hasLoadInfo) { + return loadWeight; + } return freeDiskSpace; } @Override public String toString() { - return "FreeDiskSpace: " + this.freeDiskSpace + " TotalDiskCapacity: " + this.totalDiskSpace; + return "FreeDiskSpace: " + this.freeDiskSpace + " TotalDiskCapacity: " + this.totalDiskSpace + + " WriteBytePerSecond: " + this.writeBytePerSecond + " JournalIoUtil: " + this.journalIoUtil + + " LedgerIoUtil: " + this.ledgerIoUtil + " CpuUsedRate: " + this.cpuUsedRate; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java index 8795263a5b5..31f45fafd34 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.BaseMetric; import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; @@ -70,9 +71,19 @@ private GetBookieInfoResponse getGetBookieInfoResponse() { totalDiskSpace = requestProcessor.getBookie().getTotalDiskSpace(); getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace); } - if (LOG.isDebugEnabled()) { - LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace); - } + BaseMetric baseMetric = requestProcessor.getBookie().getBaseMetric(); + int journalIoUtil = baseMetric.getJournalIoUtil(); + getBookieInfoResponse.setJournalIoUtil(journalIoUtil); + int ledgerIoUtil = baseMetric.getLedgerIoUtil(); + getBookieInfoResponse.setLedgerIoUtil(ledgerIoUtil); + int cpuUsedRate = baseMetric.getCpuUsedRate(); + getBookieInfoResponse.setCpuUsedRate(cpuUsedRate); + long writeBytePerSecond = baseMetric.getWriteBytePerSecond(); + getBookieInfoResponse.setWriteBytePerSecond(writeBytePerSecond); + LOG.debug("FreeDiskSpace info {} totalDiskSpace {} journalIoUtil {} ledgerIoUtil {}" + + " cpuUsedRate {} writeBytePerSecond {}", + freeDiskSpace, totalDiskSpace, journalIoUtil, ledgerIoUtil, + cpuUsedRate, writeBytePerSecond); requestProcessor.getRequestStats().getGetBookieInfoStats() .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } catch (IOException e) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 5ebafe8ecaf..13b538d64d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -2202,9 +2202,27 @@ public void handleV3Response(BookkeeperProtocol.Response response) { } int rc = convertStatus(status, BKException.Code.ReadException); - cb.getBookieInfoComplete(rc, - new BookieInfo(totalDiskSpace, - freeDiskSpace), ctx); + BookieInfo bInfo; + if (conf.getLoadWeightBasedPlacementEnabled()) { + int journalIoUtil = getBookieInfoResponse.hasJournalIoUtil() ? + getBookieInfoResponse.getJournalIoUtil() : -1; + int ledgerIoUtil = getBookieInfoResponse.hasLedgerIoUtil() ? + getBookieInfoResponse.getLedgerIoUtil() : -1; + int cpuUsedRate = getBookieInfoResponse.hasCpuUsedRate() ? + getBookieInfoResponse.getCpuUsedRate() : -1; + long writeBytePerSecond = getBookieInfoResponse.hasWriteBytePerSecond() ? + getBookieInfoResponse.getWriteBytePerSecond() : -1L; + + if (LOG.isDebugEnabled()) { + logResponse(status, "journalIoUtil", journalIoUtil, "ledgerIoUtil", ledgerIoUtil, + "cpuUsedRate", cpuUsedRate, "writeBytePerSecond", writeBytePerSecond); + } + bInfo = new BookieInfo(totalDiskSpace, freeDiskSpace, + journalIoUtil, ledgerIoUtil, cpuUsedRate, writeBytePerSecond); + } else { + bInfo = new BookieInfo(totalDiskSpace, freeDiskSpace); + } + cb.getBookieInfoComplete(rc, bInfo, ctx); } } From be25428ac0aa2ce7eadbd881549e4f7f7b60d9fd Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 27 Mar 2024 10:54:50 +0800 Subject: [PATCH 3/4] [Feature] implement of LoadWeightBasedPlacement --- .../apache/bookkeeper/client/BookKeeper.java | 5 +- .../DynamicWeightedRandomSelectionImpl.java | 5 + .../client/LoadWeightedSelectionImpl.java | 173 ++++++++++++++++++ .../RackawareEnsemblePlacementPolicy.java | 16 +- .../RackawareEnsemblePlacementPolicyImpl.java | 144 ++++++++++++--- .../RegionAwareEnsemblePlacementPolicy.java | 18 +- .../TopologyAwareEnsemblePlacementPolicy.java | 11 +- .../client/WeightedRandomSelection.java | 3 + .../client/WeightedRandomSelectionImpl.java | 4 + .../bookkeeper/conf/ClientConfiguration.java | 70 +++++++ .../client/TestWeightedRandomSelection.java | 5 + 11 files changed, 414 insertions(+), 40 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LoadWeightedSelectionImpl.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index d7043dc8c9a..7233076b1f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -511,8 +511,9 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool, scheduler, rootStatsLogger, this.bookieWatcher.getBookieAddressResolver()); - if (conf.getDiskWeightBasedPlacementEnabled()) { - LOG.info("Weighted ledger placement enabled"); + if (conf.getDiskWeightBasedPlacementEnabled() || conf.getLoadWeightBasedPlacementEnabled()) { + LOG.info("Weighted ledger placement enabled. DiskWeight: {}, LoadWeight: {}", + conf.getDiskWeightBasedPlacementEnabled(), conf.getLoadWeightBasedPlacementEnabled()); ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder() .setNameFormat("BKClientMetaDataPollScheduler-%d"); this.bookieInfoScheduler = Executors.newSingleThreadScheduledExecutor(tFBuilder.build()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DynamicWeightedRandomSelectionImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DynamicWeightedRandomSelectionImpl.java index 7da6b4c20d6..22033d7a649 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DynamicWeightedRandomSelectionImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DynamicWeightedRandomSelectionImpl.java @@ -162,4 +162,9 @@ public T getNextRandom(Collection selectedNodes) { public void setMaxProbabilityMultiplier(int max) { this.maxProbabilityMultiplier = max; } + + @Override + public int getSize() { + return weightMap.size(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LoadWeightedSelectionImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LoadWeightedSelectionImpl.java new file mode 100644 index 00000000000..78b4c70ddb5 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LoadWeightedSelectionImpl.java @@ -0,0 +1,173 @@ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class LoadWeightedSelectionImpl implements WeightedRandomSelection { + static final Logger LOG = LoggerFactory.getLogger(LoadWeightedSelectionImpl.class); + + Double randomMax; + long minWeight = 10; + long maxWeight = 100; + long overLoadWeight = 5; + Map map; + TreeMap cummulativeMap = new TreeMap(); + ReadWriteLock rwLock = new ReentrantReadWriteLock(true); + int loadThreshold; + double lowLoadBookieRatio; + + LoadWeightedSelectionImpl(int loadThreshold, double lowLoadBookieRatio) { + this.loadThreshold = loadThreshold; + this.lowLoadBookieRatio = lowLoadBookieRatio; + } + + @Override + public void updateMap(Map map) { + // step 1: exclude the high load bookie, whose load is higher than threshold, default 70% + Map lowLoadRackMap = new HashMap<>(); + for (Map.Entry e : map.entrySet()) { + if (e.getValue().getLoad() < 0) { + // if getLoad() is -1, it means the bookie maybe disable baseMetricMonitorEnabled + // or broker can not get load information from bookie because of restApi timeout or exception. + // current method: we only skip these -1 bookie + // Alternative method: we can give a default load for these -1 bookie + } else if (e.getValue().getLoad() <= loadThreshold) { + lowLoadRackMap.put(e.getKey(), e.getValue()); + } else { + // just exclude the high load bookie + } + } + + // The probability of picking a bookie randomly is defaultPickProbability + // but we may change that priority by looking at the weight that each bookie + // carries. + TreeMap tmpCummulativeMap = new TreeMap<>(); + Double key = 0.0; + + if (lowLoadRackMap.size() <= map.size() * lowLoadBookieRatio) { + // corner case: if most bookie not have bookieInfo, or load is more than threshold, + // the bookie's number which can be selected would be a little. + // And this would cause ledger only can select from a little bookies, + // maybe cause write throughput incline problem. + // ----- + // so if low-load-bookie's number <= all-selected-bookie's number * ratio, + // fallback to random select of all bookies + Double defaultPickProbability = 1d / map.size(); + for (Map.Entry e : map.entrySet()) { + tmpCummulativeMap.put(key, e.getKey()); + key += defaultPickProbability; + } + } else { + // step2: roulette wheel selection + // default bookies's weight is writeBytePerSecond, aiming to balance bookie throughput + map = lowLoadRackMap; + + // get the sum total of all the values; this will be used to + // calculate the weighted probability later on + Long totalWeight = 0L; + List values = new ArrayList<>(map.values()); + for (int i = 0; i < values.size(); i++) { + // case1: if getLoad() == -1, the bookie should be excluded. + // but if getLoad() != -1, getWeight() == -1, it still have problem. + // so deal with this case, although it should not happen. + if (values.get(i).getWeight() < 0) { + continue; + } + // bookie's weight should be 0-100 + // case 2: if getWeight() > 100, this bookie is overload and + // should be regard as overLoadWeight, default is 5 + if (values.get(i).getWeight() > maxWeight) { + totalWeight += overLoadWeight; + continue; + } + // case3: (100 - bookie's weight) < 10, smooth the probability. + // because 0-10's probability differ so much from 10-100's probability. + // we don't want this big difference cause write incline problem. + totalWeight += Math.max((maxWeight - values.get(i).getWeight()), minWeight); + } + + Map weightMap = new HashMap<>(); + for (Map.Entry e : map.entrySet()) { + double weightedProbability; + if (e.getValue().getWeight() < 0) { + LOG.error("should not occur this case in LoadWeightedSelectionImpl." + + " getLoad() != -1, but getWeight() is {}. bookie is {}", + e.getValue().getWeight(), e.getKey()); + continue; + } + if (e.getValue().getWeight() > maxWeight) { + weightedProbability = (double) overLoadWeight / (double) totalWeight; + weightMap.put(e.getKey(), weightedProbability); + continue; + } + // the higher load weight, the less probability + weightedProbability = (double) Math.max((maxWeight - e.getValue().getWeight()), minWeight) / + (double) totalWeight; + weightMap.put(e.getKey(), weightedProbability); + } + + + for (Map.Entry e : weightMap.entrySet()) { + tmpCummulativeMap.put(key, e.getKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("Key: {} Value: {} AssignedKey: {} AssignedWeight: {}", + e.getKey(), e.getValue(), key, e.getValue()); + } + key += e.getValue(); + } + } + + rwLock.writeLock().lock(); + try { + this.map = map; + cummulativeMap = tmpCummulativeMap; + randomMax = key; + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public T getNextRandom() { + rwLock.readLock().lock(); + try { + // pick a random number between 0 and randMax + Double randomNum = randomMax * Math.random(); + // find the nearest key in the map corresponding to the randomNum + Double key = cummulativeMap.floorKey(randomNum); + return cummulativeMap.get(key); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public void setMaxProbabilityMultiplier(int max) { + + } + + @Override + public int getSize() { + rwLock.readLock().lock(); + try { + return cummulativeMap.size(); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public T getNextRandom(Collection selectedNodes) { + throw new UnsupportedOperationException("getNextRandom is not implemented for WeightedRandomSelectionImpl"); + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 1fb17ca3ef1..14fcef5a76b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieNode; import org.apache.bookkeeper.net.DNSToSwitchMapping; @@ -57,11 +58,12 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso int minNumRacksPerWriteQuorum, boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, - StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { + StatsLogger statsLogger, + BookieAddressResolver bookieAddressResolver) { return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, false, - statsLogger, bookieAddressResolver); + statsLogger, bookieAddressResolver, new ClientConfiguration()); } @Override @@ -76,22 +78,24 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso boolean enforceMinNumRacksPerWriteQuorum, boolean ignoreLocalNodeInPlacementPolicy, boolean useHostnameResolveLocalNodePlacementPolicy, - StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { + StatsLogger statsLogger, + BookieAddressResolver bookieAddressResolver, + ClientConfiguration conf) { if (stabilizePeriodSeconds > 0) { super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, useHostnameResolveLocalNodePlacementPolicy, - statsLogger, bookieAddressResolver); + statsLogger, bookieAddressResolver, conf); slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability); slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, - useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver, conf); } else { super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, - useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver); + useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver, conf); slave = null; } return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 3863a26a245..ad3d7ba1b11 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -91,6 +91,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP protected boolean enforceMinNumRacksPerWriteQuorum; protected boolean ignoreLocalNodeInPlacementPolicy; protected boolean useHostnameResolveLocalNodePlacementPolicy; + protected int loadThreshold = 70; + protected double lowLoadBookieRatio = 0.5; public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; @@ -177,7 +179,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns return initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy, - false, statsLogger, bookieAddressResolver); + false, statsLogger, bookieAddressResolver, new ClientConfiguration()); } /** @@ -198,7 +200,8 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns boolean ignoreLocalNodeInPlacementPolicy, boolean useHostnameResolveLocalNodePlacementPolicy, StatsLogger statsLogger, - BookieAddressResolver bookieAddressResolver) { + BookieAddressResolver bookieAddressResolver, + ClientConfiguration conf) { checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead."); this.statsLogger = statsLogger; this.bookieAddressResolver = bookieAddressResolver; @@ -233,6 +236,8 @@ public Integer getSample() { this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum; this.ignoreLocalNodeInPlacementPolicy = ignoreLocalNodeInPlacementPolicy; this.useHostnameResolveLocalNodePlacementPolicy = useHostnameResolveLocalNodePlacementPolicy; + this.loadThreshold = conf.getLoadThreshold(); + this.lowLoadBookieRatio = conf.getLowLoadBookieRatio(); // create the network topology if (stabilizePeriodSeconds > 0) { @@ -259,7 +264,12 @@ public Integer getSample() { dnsResolver.getClass().getName()); this.isWeighted = isWeighted; - if (this.isWeighted) { + this.isLoadWeighted = conf.getLoadWeightBasedPlacementEnabled(); + if (this.isLoadWeighted) { + this.weightedSelection = new LoadWeightedSelectionImpl<>(this.loadThreshold, this.lowLoadBookieRatio); + LOG.info("Load Weight based placement with loadThreshold:{} lowLoadBookieRatio:{}", + this.loadThreshold, this.lowLoadBookieRatio); + } else if (this.isWeighted) { this.maxWeightMultiple = maxWeightMultiple; this.weightedSelection = new WeightedRandomSelectionImpl(this.maxWeightMultiple); LOG.info("Weight based placement with max multiple of " + this.maxWeightMultiple); @@ -345,7 +355,8 @@ public Long load(BookieId key) throws Exception { conf.getIgnoreLocalNodeInPlacementPolicy(), conf.getUseHostnameResolveLocalNodePlacementPolicy(), statsLogger, - bookieAddressResolver); + bookieAddressResolver, + conf); } @Override @@ -643,7 +654,7 @@ public BookieNode selectFromNetworkLocation(Set excludeRacks, } try { - return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, predicate, ensemble).get(0); + return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, predicate, ensemble, false).get(0); } catch (BKNotEnoughBookiesException e) { if (!fallbackToRandom) { LOG.error( @@ -660,8 +671,9 @@ public BookieNode selectFromNetworkLocation(Set excludeRacks, } } - private WeightedRandomSelection prepareForWeightedSelection(List leaves) { - // create a map of bookieNode->freeDiskSpace for this rack. The assumption is that + private WeightedRandomSelection prepareForWeightedSelection(List leaves, + boolean isLoadWeight) { + // create a map of bookieNode->weight for this rack. The assumption is that // the number of nodes in a rack is of the order of 40, so it shouldn't be too bad // to build it every time during a ledger creation Map rackMap = new HashMap(); @@ -680,8 +692,39 @@ private WeightedRandomSelection prepareForWeightedSelection(List wRSelection = new WeightedRandomSelectionImpl( - maxWeightMultiple); + WeightedRandomSelection wRSelection; + if (isLoadWeight) { + wRSelection = new LoadWeightedSelectionImpl<>(this.loadThreshold, this.lowLoadBookieRatio); + } else { + wRSelection = new WeightedRandomSelectionImpl<>(maxWeightMultiple); + } + wRSelection.updateMap(rackMap); + return wRSelection; + } + + private WeightedRandomSelection prepareForBookieWeightedSelection(List leaves, + boolean isLoadWeight) { + // create a map of bookieNode->weight for this rack. The assumption is that + // the number of nodes in a rack is of the order of 40, so it shouldn't be too bad + // to build it every time during a ledger creation + Map rackMap = new HashMap(); + for (BookieNode bookie : leaves) { + if (this.bookieInfoMap.containsKey(bookie)) { + rackMap.put(bookie, this.bookieInfoMap.get(bookie)); + } else { + rackMap.put(bookie, new BookieInfo()); + } + } + if (rackMap.size() == 0) { + return null; + } + + WeightedRandomSelection wRSelection; + if (isLoadWeight) { + wRSelection = new LoadWeightedSelectionImpl<>(this.loadThreshold, this.lowLoadBookieRatio); + } else { + wRSelection = new WeightedRandomSelectionImpl<>(maxWeightMultiple); + } wRSelection.updateMap(rackMap); return wRSelection; } @@ -703,23 +746,39 @@ protected BookieNode selectRandomFromRack(String netPath, Set excludeBooki Ensemble ensemble) throws BKNotEnoughBookiesException { WeightedRandomSelection wRSelection = null; List leaves = new ArrayList(topology.getLeaves(netPath)); - if (!this.isWeighted) { - Collections.shuffle(leaves); - } else { + if (this.isLoadWeighted) { if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) { throw new BKNotEnoughBookiesException(); } - wRSelection = prepareForWeightedSelection(leaves); + wRSelection = prepareForWeightedSelection(leaves, true); if (wRSelection == null) { throw new BKNotEnoughBookiesException(); } + } else if (this.isWeighted) { + if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) { + throw new BKNotEnoughBookiesException(); + } + wRSelection = prepareForWeightedSelection(leaves, false); + if (wRSelection == null) { + throw new BKNotEnoughBookiesException(); + } + } else { + Collections.shuffle(leaves); } Iterator it = leaves.iterator(); Set bookiesSeenSoFar = new HashSet(); while (true) { Node n; - if (isWeighted) { + if (isLoadWeighted) { + // TODO: enhance loop maybe executed too long + if (bookiesSeenSoFar.size() == wRSelection.getSize()) { + // Don't loop infinitely. + break; + } + n = wRSelection.getNextRandom(); + bookiesSeenSoFar.add(n); + } else if (isWeighted) { if (bookiesSeenSoFar.size() == leaves.size()) { // Don't loop infinitely. break; @@ -767,22 +826,40 @@ protected List selectRandom(int numBookies, Predicate predicate, Ensemble ensemble) throws BKNotEnoughBookiesException { - return selectRandomInternal(null, numBookies, excludeBookies, predicate, ensemble); + // can be regard as the last random select. + // so isLastRandomSelect is always true. + return selectRandomInternal(null, numBookies, excludeBookies, predicate, ensemble, true); } + // 1. last random select for RegionAware + // 2. last random select for RackAware + // 3. normal random select for RackAware protected List selectRandomInternal(List bookiesToSelectFrom, int numBookies, Set excludeBookies, Predicate predicate, - Ensemble ensemble) + Ensemble ensemble, + boolean isLastRandomSelect) throws BKNotEnoughBookiesException { + // If this is the last random select in placementPolicy + // when previous select throws BKNotEnoughBookiesException, + // we'd better not add LoadWeightSelect strategy, just let it randomly select in whole cluster. + // Therefore, add 'isLastRandomSelect' to indicate whether is the last random select. WeightedRandomSelection wRSelection = null; if (bookiesToSelectFrom == null) { // If the list is null, we need to select from the entire knownBookies set wRSelection = this.weightedSelection; bookiesToSelectFrom = new ArrayList(knownBookies.values()); } - if (isWeighted) { + if (isLoadWeighted && !isLastRandomSelect) { + if (CollectionUtils.subtract(bookiesToSelectFrom, excludeBookies).size() < numBookies) { + throw new BKNotEnoughBookiesException(); + } + wRSelection = prepareForBookieWeightedSelection(bookiesToSelectFrom, true); + if (wRSelection == null) { + throw new BKNotEnoughBookiesException(); + } + } else if (isWeighted) { if (CollectionUtils.subtract(bookiesToSelectFrom, excludeBookies).size() < numBookies) { throw new BKNotEnoughBookiesException(); } @@ -811,7 +888,13 @@ protected List selectRandomInternal(List bookiesToSelect Iterator it = bookiesToSelectFrom.iterator(); Set bookiesSeenSoFar = new HashSet(); while (numBookies > 0) { - if (isWeighted) { + if (isLoadWeighted && !isLastRandomSelect) { + if (bookiesSeenSoFar.size() == wRSelection.getSize()) { + break; + } + bookie = wRSelection.getNextRandom(); + bookiesSeenSoFar.add(bookie); + } else if (isWeighted) { if (bookiesSeenSoFar.size() == bookiesToSelectFrom.size()) { // If we have gone through the whole available list of bookies, // and yet haven't been able to satisfy the ensemble request, bail out. @@ -1326,23 +1409,38 @@ private BookieNode replaceToAdherePlacementPolicyInternal( WeightedRandomSelection wRSelection = null; final List leaves = new ArrayList<>(topology.getLeaves(condition.getLeft())); - if (!isWeighted) { - Collections.shuffle(leaves); - } else { + if (isLoadWeighted) { + if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) { + throw new BKNotEnoughBookiesException(); + } + wRSelection = prepareForWeightedSelection(leaves, true); + if (wRSelection == null) { + throw new BKNotEnoughBookiesException(); + } + } else if (isWeighted) { if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) { throw new BKNotEnoughBookiesException(); } - wRSelection = prepareForWeightedSelection(leaves); + wRSelection = prepareForWeightedSelection(leaves, false); if (wRSelection == null) { throw new BKNotEnoughBookiesException(); } + } else { + Collections.shuffle(leaves); } final Iterator it = leaves.iterator(); final Set bookiesSeenSoFar = new HashSet<>(); while (true) { Node n; - if (isWeighted) { + if (isLoadWeighted) { + if (bookiesSeenSoFar.size() == wRSelection.getSize()) { + // Don't loop infinitely. + break; + } + n = wRSelection.getNextRandom(); + bookiesSeenSoFar.add(n); + } else if (isWeighted) { if (bookiesSeenSoFar.size() == leaves.size()) { // Don't loop infinitely. break; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index c742e62c04d..6735726d499 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -78,6 +78,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme protected boolean enforceDurabilityInReplace = false; protected Feature disableDurabilityFeature; private int lastRegionIndex = 0; + protected ClientConfiguration conf; RegionAwareEnsemblePlacementPolicy() { super(); @@ -141,7 +142,7 @@ public void handleBookiesThatJoined(Set joinedBookies) { this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, this.ignoreLocalNodeInPlacementPolicy, - this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver) + this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, bookieAddressResolver, conf) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } @@ -204,7 +205,7 @@ public void onBookieRackChange(List bookieAddressList) { this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, this.ignoreLocalNodeInPlacementPolicy, this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, - bookieAddressResolver) + bookieAddressResolver, conf) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); perRegionPlacement.put(newRegion, newRegionPlacement); } @@ -220,6 +221,14 @@ public void onBookieRackChange(List bookieAddressList) { } } + @Override + public void updateBookieInfo(Map bookieInfoMap) { + super.updateBookieInfo(bookieInfoMap); + for (TopologyAwareEnsemblePlacementPolicy policy: perRegionPlacement.values()) { + policy.updateBookieInfo(bookieInfoMap); + } + } + @Override public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional optionalDnsResolver, @@ -245,7 +254,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, this.ignoreLocalNodeInPlacementPolicy, this.useHostnameResolveLocalNodePlacementPolicy, - statsLogger, bookieAddressResolver) + statsLogger, bookieAddressResolver, conf) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, @@ -269,6 +278,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME, BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT)); } + this.conf = conf; return this; } @@ -285,7 +295,7 @@ protected List selectRandomFromRegions(Set availableRegions, } } - return selectRandomInternal(availableBookies, numBookies, excludeBookies, predicate, ensemble); + return selectRandomInternal(availableBookies, numBookies, excludeBookies, predicate, ensemble, true); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java index 4976f96e8c2..8df4d8b3020 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -67,6 +67,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements // Initialize to empty set protected ImmutableSet readOnlyBookies = ImmutableSet.of(); boolean isWeighted; + boolean isLoadWeighted; protected WeightedRandomSelection weightedSelection; // for now, we just maintain the writable bookies' topology protected NetworkTopology topology; @@ -664,7 +665,7 @@ public Set onClusterChanged(Set writableBookies, } handleBookiesThatLeft(leftBookies); handleBookiesThatJoined(joinedBookies); - if (this.isWeighted && (leftBookies.size() > 0 || joinedBookies.size() > 0)) { + if ((this.isWeighted || this.isLoadWeighted) && (leftBookies.size() > 0 || joinedBookies.size() > 0)) { this.weightedSelection.updateMap(this.bookieInfoMap); } if (!readOnlyBookies.isEmpty()) { @@ -687,7 +688,7 @@ public void handleBookiesThatLeft(Set leftBookies) { BookieNode node = knownBookies.remove(addr); if (null != node) { topology.remove(node); - if (this.isWeighted) { + if (this.isWeighted || this.isLoadWeighted) { this.bookieInfoMap.remove(node); } @@ -720,7 +721,7 @@ public void handleBookiesThatJoined(Set joinedBookies) { topology.add(node); knownBookies.put(addr, node); historyBookies.put(addr, node); - if (this.isWeighted) { + if (this.isWeighted || this.isLoadWeighted) { this.bookieInfoMap.putIfAbsent(node, new BookieInfo()); } @@ -784,8 +785,8 @@ public static int differBetweenBookies(List bookiesA, List b @Override public void updateBookieInfo(Map bookieInfoMap) { - if (!isWeighted) { - LOG.info("bookieFreeDiskInfo callback called even without weighted placement policy being used."); + if (!isWeighted && !isLoadWeighted) { + LOG.info("bookieInfo callback called even without weighted placement policy being used."); return; } rwLock.writeLock().lock(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java index 8a44174a45d..2f37dce834b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java @@ -24,6 +24,7 @@ interface WeightedRandomSelection { interface WeightedObject { long getWeight(); + int getLoad(); } void updateMap(Map map); @@ -33,4 +34,6 @@ interface WeightedObject { T getNextRandom(Collection selectedNodes); void setMaxProbabilityMultiplier(int max); + + int getSize(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java index 16443c63d2c..c449bc200f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java @@ -165,4 +165,8 @@ public T getNextRandom(Collection selectedNodes) { throw new UnsupportedOperationException("getNextRandom is not implemented for WeightedRandomSelectionImpl"); } + @Override + public int getSize() { + return cummulativeMap.size(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index feae6924655..361b849acfa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -143,6 +143,7 @@ public class ClientConfiguration extends AbstractConfiguration weightedRandomSelectionClass; From 28056fbcaf3c0da12f5d45848484af1047470ad4 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 27 Mar 2024 10:55:37 +0800 Subject: [PATCH 4/4] add unittest for LoadWeightBasedPlacement --- .../TestRackawareEnsemblePlacementPolicy.java | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index ed37159ee19..290e22d115d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1932,6 +1932,170 @@ public void testRemoveBookieFromCluster() { repp.onClusterChanged(addrs, new HashSet()); } + @Test + public void testLoadWeightedPlacementAndNewEnsemble() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r1"); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + // Update cluster + Set addrs = new HashSet<>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + + + conf.setLoadWeightBasedPlacementEnabled(true); + conf.setLowLoadBookieRatio(0.5); + conf.setLoadThreshold(70); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + repp.onClusterChanged(addrs, new HashSet<>()); + Map bookieInfoMap = new HashMap<>(); + bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + + // low-load-bookie's number <= all-selected-bookie's number * ratio + // fallback to random select in LoadWeightedSelectionImpl + // so addr4 still can be selected + Map selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + int numTries = 50000; + Set excludeList = new HashSet<>(); + EnsemblePlacementPolicy.PlacementResult> ensembleResponse; + List ensemble; + int ensembleSize = 3; + int writeQuorumSize = 2; + int acqQuorumSize = 2; + for (int i = 0; i < numTries; i++) { + ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList); + ensemble = ensembleResponse.getResult(); + assertTrue( + "Rackaware selection not happening " + + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver), + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2); + for (BookieId b : ensemble) { + selectionCounts.put(b, selectionCounts.get(b) + 1); + } + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) != 0); + + + // add addr5, then enough bookie in the same rack + // so LoadWeightedSelectionImpl would exclude addr4 which is high load + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + repp.handleBookiesThatJoined(Collections.singleton(addr5.toBookieId())); + bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + addrs.add(addr5.toBookieId()); + selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + for (int i = 0; i < numTries; i++) { + ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList); + ensemble = ensembleResponse.getResult(); + assertTrue( + "Rackaware selection not happening " + + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver), + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2); + for (BookieId b : ensemble) { + selectionCounts.put(b, selectionCounts.get(b) + 1); + } + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) == 0); + assertTrue(selectionCounts.get(addr5.toBookieId()) != 0); + } + + @Test + public void testLoadWeightedPlacementAndReplaceBookie() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r1"); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + // Update cluster + Set addrs = new HashSet<>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + + conf.setLoadWeightBasedPlacementEnabled(true); + conf.setLowLoadBookieRatio(0.5); + conf.setLoadThreshold(70); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + repp.onClusterChanged(addrs, new HashSet<>()); + Map bookieInfoMap = new HashMap<>(); + bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + // although addr2 is in excludedBookies + // since it can be added to LoadWeightedSelectionImpl + // low load bookie's number is enough and addr4 would be excluded + Map selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + int numTries = 50000; + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse; + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy; + BookieId replacedBookie; + for (int i = 0; i < numTries; i++) { + // replace node under r3 + replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), + addr2.toBookieId(), new HashSet<>()); + replacedBookie = replaceBookieResponse.getResult(); + isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); + assertTrue("replaced : " + replacedBookie, addr3.toBookieId().equals(replacedBookie)); + assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy); + selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1); + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) == 0); + } + @Test public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);