diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index b3711683acdd..6af9e436109b 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -4061,6 +4061,32 @@ class Rdb_transaction { return get_iterator(options, column_family, table_type); } + rocksdb::Iterator *refresh_iterator( + const rocksdb::Snapshot* snapshot, + rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter, + const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, TABLE_TYPE table_type) { + rocksdb::ReadOptions options = m_read_opts; + const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache); + if (skip_bloom_filter) { + const bool enable_iterate_bounds = + THDVAR(get_thd(), enable_iterate_bounds); + options.total_order_seek = true; + options.iterate_lower_bound = + enable_iterate_bounds ? &eq_cond_lower_bound : nullptr; + options.iterate_upper_bound = + enable_iterate_bounds ? &eq_cond_upper_bound : nullptr; + } else { + // With this option, Iterator::Valid() returns false if key + // is outside of the prefix bloom filter range set at Seek(). + // Must not be set to true if not using bloom filter. + options.prefix_same_as_start = true; + } + options.fill_cache = fill_cache; + options.snapshot = snapshot; + return get_iterator(options, column_family, table_type); + } + virtual bool is_tx_started(TABLE_TYPE table_type) const = 0; virtual void start_tx(TABLE_TYPE table_type) = 0; virtual void start_stmt() = 0; @@ -17485,6 +17511,16 @@ rocksdb::Iterator *rdb_tx_get_iterator( } } +rocksdb::Iterator *rdb_tx_refresh_iterator( + THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter, + const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, + const rocksdb::Snapshot *snapshot, TABLE_TYPE table_type) { + Rdb_transaction *tx = get_tx_from_thd(thd); + return tx->refresh_iterator(snapshot, cf, skip_bloom_filter, + eq_cond_lower_bound, eq_cond_upper_bound, table_type); +} + bool rdb_tx_started(Rdb_transaction *tx, TABLE_TYPE table_type) { return tx->is_tx_started(table_type); } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 070fe089d4b0..d1b798f227ec 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -1148,6 +1148,12 @@ rocksdb::Iterator *rdb_tx_get_iterator( const rocksdb::Snapshot **snapshot, TABLE_TYPE table_type, bool read_current = false, bool create_snapshot = true); +rocksdb::Iterator *rdb_tx_refresh_iterator( + THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter, + const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, + const rocksdb::Snapshot *snapshot, TABLE_TYPE table_type); + rocksdb::Status rdb_tx_get(Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc index 5adf534abe14..cd387affe642 100644 --- a/storage/rocksdb/rdb_iterator.cc +++ b/storage/rocksdb/rdb_iterator.cc @@ -172,6 +172,30 @@ void Rdb_iterator_base::setup_scan_iterator(const rocksdb::Slice *const slice, } } +// This function is intented for releasing MemTable and SST objects held by +// rocksdb::Version object which referenced by old rocksdb::Iterator, newly +// created Iterator may reference a newer rocksdb::Version object, The data +// view of these 2 iterators are identical. +void Rdb_iterator_base::refresh_iter() { + std::string curr_key; + bool valid = m_scan_it->Valid(); + if (valid) { + curr_key = m_scan_it->key().ToString(); + } + delete m_scan_it; + bool skip_bloom = m_scan_it_skips_bloom; + m_scan_it = rdb_tx_refresh_iterator( + m_thd, m_kd->get_cf(), skip_bloom, m_scan_it_lower_bound_slice, + m_scan_it_upper_bound_slice, m_scan_it_snapshot, m_table_type); + if (valid) { + m_scan_it->Seek(curr_key); + SHIP_ASSERT(m_scan_it->Valid()); + SHIP_ASSERT(m_scan_it->key() == curr_key); + } else { + SHIP_ASSERT(!m_scan_it->Valid()); + } +} + int Rdb_iterator_base::calc_eq_cond_len(enum ha_rkey_function find_flag, const rocksdb::Slice &start_key, const int bytes_changed_by_succ, @@ -214,6 +238,12 @@ int Rdb_iterator_base::next_with_direction(bool move_forward, bool skip_next) { const auto &kd = *m_kd; Rdb_transaction *const tx = get_tx_from_thd(m_thd); + const uint32_t refresh_interval = 100000; + if (++m_call_cnt >= refresh_interval) { + refresh_iter(); + m_call_cnt = 0; + } + for (;;) { DEBUG_SYNC(m_thd, "rocksdb.check_flags_nwd"); if (thd_killed(m_thd)) { diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h index 7e58d6adcd92..86839e19fa69 100644 --- a/storage/rocksdb/rdb_iterator.h +++ b/storage/rocksdb/rdb_iterator.h @@ -122,6 +122,8 @@ class Rdb_iterator_base : public Rdb_iterator { /* Iterator used for range scans and for full table/index scans */ rocksdb::Iterator *m_scan_it; + uint32_t m_call_cnt = 0; // for refresh_iter + void refresh_iter(); /* Whether m_scan_it was created with skip_bloom=true */ bool m_scan_it_skips_bloom;