Skip to content

Commit 1bb3bb0

Browse files
committed
memory leak
1 parent 13b4b1e commit 1bb3bb0

File tree

4 files changed

+50
-12
lines changed

4 files changed

+50
-12
lines changed

pkg/objectio/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type BlockReadFilter struct {
4949
Valid bool
5050
SortedSearchFunc ReadFilterSearchFuncType
5151
UnSortedSearchFunc ReadFilterSearchFuncType
52+
Cleanup func() // Cleanup function to release resources (e.g., reusableTempVec)
5253
}
5354

5455
func (f BlockReadFilter) DecideSearchFunc(isSortedBlk bool) ReadFilterSearchFuncType {

pkg/sql/colexec/table_function/ivf_search.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func (u *ivfSearchState) reset(tf *TableFunction, proc *process.Process) {
7676
if u.batch != nil {
7777
u.batch.CleanOnlyData()
7878
}
79+
// Note: bloomFilter is kept across resets as it's only set once during initialization
80+
// It will be cleared in free() method
7981
}
8082

8183
func (u *ivfSearchState) call(tf *TableFunction, proc *process.Process) (vm.CallResult, error) {
@@ -112,6 +114,8 @@ func (u *ivfSearchState) free(tf *TableFunction, proc *process.Process, pipeline
112114
if u.batch != nil {
113115
u.batch.Clean(proc.Mp())
114116
}
117+
// Clear bloomFilter bytes to release memory
118+
u.bloomFilter = nil
115119
}
116120

117121
// waitBloomFilterForTableFunction blocks until it receives a bloomfilter runtime filter

pkg/vm/engine/readutil/pk_filter.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,8 @@ func ConstructBlockPKFilter(
651651
sels = make([]int64, 0, len(validIndices))
652652
}
653653
bf.Test(tempVec, func(exist bool, idx int) {
654-
if exist && idx < len(validIndices) {
654+
// Add strict boundary check to prevent index out of range
655+
if exist && idx >= 0 && idx < len(validIndices) {
655656
sels = append(sels, int64(validIndices[idx]))
656657
}
657658
})
@@ -696,7 +697,8 @@ func ConstructBlockPKFilter(
696697
sels = make([]int64, 0, len(validIndices))
697698
}
698699
bf.Test(tempVec, func(exist bool, idx int) {
699-
if exist && idx < len(validIndices) {
700+
// Add strict boundary check to prevent index out of range
701+
if exist && idx >= 0 && idx < len(validIndices) {
700702
sels = append(sels, int64(validIndices[idx]))
701703
}
702704
})
@@ -751,14 +753,20 @@ func ConstructBlockPKFilter(
751753
rowIndices[i] = int(off)
752754
}
753755
bfMatched := applyBFToCompositePK(vec, rowIndices)
754-
// Reuse exists map
756+
// Reuse exists map - use more efficient reallocation for large maps
755757
if reusableExists == nil || len(bfMatched) > len(reusableExists)*2 {
756758
// If map is too small, reallocate
757759
reusableExists = make(map[int]bool, len(bfMatched))
758-
} else {
759-
// Clear map
760-
for k := range reusableExists {
761-
delete(reusableExists, k)
760+
} else if len(reusableExists) > 0 {
761+
// For small maps, clear by reallocating (more efficient than delete loop)
762+
if len(reusableExists) < 100 {
763+
// For small maps, use delete loop
764+
for k := range reusableExists {
765+
delete(reusableExists, k)
766+
}
767+
} else {
768+
// For larger maps, reallocate is more efficient
769+
reusableExists = make(map[int]bool, len(bfMatched))
762770
}
763771
}
764772
exists = reusableExists
@@ -768,17 +776,24 @@ func ConstructBlockPKFilter(
768776
}
769777
} else {
770778
// Non-composite primary key: test directly
771-
// Reuse exists map
779+
// Reuse exists map - use more efficient reallocation for large maps
772780
if reusableExists == nil || rowCount > len(reusableExists)*2 {
773781
reusableExists = make(map[int]bool, rowCount)
774-
} else {
775-
// Clear map
776-
for k := range reusableExists {
777-
delete(reusableExists, k)
782+
} else if len(reusableExists) > 0 {
783+
// For small maps, clear by reallocating (more efficient than delete loop)
784+
if len(reusableExists) < 100 {
785+
// For small maps, use delete loop
786+
for k := range reusableExists {
787+
delete(reusableExists, k)
788+
}
789+
} else {
790+
// For larger maps, reallocate is more efficient
791+
reusableExists = make(map[int]bool, rowCount)
778792
}
779793
}
780794
exists = reusableExists
781795
bf.Test(vec, func(exist bool, row int) {
796+
// Add strict boundary check to prevent index out of range
782797
if row >= 0 && row < rowCount {
783798
exists[row] = exist
784799
if exist {
@@ -801,6 +816,20 @@ func ConstructBlockPKFilter(
801816
readFilter.SortedSearchFunc = wrap(sortedSearchFunc)
802817
readFilter.UnSortedSearchFunc = wrap(unSortedSearchFunc)
803818
readFilter.Valid = true
819+
// Set cleanup function to release reusableTempVec and BloomFilter when filter is no longer needed
820+
readFilter.Cleanup = func() {
821+
// Release reusableTempVec
822+
if reusableTempVec != nil {
823+
reusableTempVec.Free(mp)
824+
reusableTempVec = nil
825+
}
826+
// Release BloomFilter memory
827+
bf.Clean()
828+
// Clear reusableExists map by reallocating for better memory efficiency
829+
if reusableExists != nil {
830+
reusableExists = nil
831+
}
832+
}
804833
return readFilter, nil
805834
}
806835

pkg/vm/engine/readutil/reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ import (
5050
// -----------------------------------------------------------------
5151

5252
func (mixin *withFilterMixin) reset() {
53+
// Cleanup reusableTempVec and other resources before resetting filter
54+
if mixin.filterState.filter.Cleanup != nil {
55+
mixin.filterState.filter.Cleanup()
56+
}
5357
mixin.filterState.filter = objectio.BlockReadFilter{}
5458
mixin.filterState.memFilter = MemPKFilter{}
5559
mixin.columns.indexOfFirstSortedColumn = -1

0 commit comments

Comments
 (0)