-
Notifications
You must be signed in to change notification settings - Fork 20
Possible memory leak #23
Description
Good day
If I run the attached script (see below), tested on windows, unix and stm32f769 builds, it runs fine until the free memory drops below 5%, and then crashes with the error:
Traceback (most recent call last):
File "E:\scratch\micropython\mp_usqlite\samples\producer_consumer_sd.py", line 167, in
File "asyncio/core.py", line 1, in run
File "asyncio/core.py", line 1, in run_until_complete
File "E:\scratch\micropython\mp_usqlite\samples\producer_consumer_sd.py", line 91, in consume
usqlite_Error: error (26): file is not a database sql: 'SELECT COUNT(ID) from items;'
PS E:\scratch\micropython\mp_usqlite\micropython\ports\windows\build-dev>
If I run the script with useSqlite = False, the memory usage drops until about 1%, but the the gc kicks in and continues as expected.
Although, if useSqlite = True, when the memory usage drops to around 5%, the above error is displayed.
Might it be an issue in the interop between the gc and usqlite?
Regards
Script:
import uasyncio as asyncio
from queue import Queue
import gc
import os
import usqlite
Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def init(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
self._evput = asyncio.Event() # Triggered by put, tested by get
self._evget = asyncio.Event() # Triggered by get, tested by put
def _get(self):
self._evget.set() # Schedule all tasks waiting on get
self._evget.clear()
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty(): # May be multiple tasks waiting on get()
# Queue is empty, suspend task until a put occurs
# 1st of N tasks gets, the rest loop again
await self._evput.wait()
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._evput.set() # Schedule tasks waiting on put
self._evput.clear()
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.full():
# Queue full
await self._evget.wait()
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.full():
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return len(self._queue) == 0
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default) or
# any negative number, then full() is never True.
return self.maxsize > 0 and self.qsize() >= self.maxsize
event_test = asyncio.Event()
useSqlite = False
async def produce(queue):
icount = 0
while True:
await queue.put(str(icount)) # Put result on queue
icount += 1
await asyncio.sleep(.2)
async def consume(queue, dbConn):
while True:
await event_test.wait()
while (queue.qsize() > 20):
item = await queue.get() # Blocks until data is ready
print("Pulled item..." + str(item))
if (useSqlite == True):
dbConn.executemany(
"BEGIN TRANSACTION;"
f"INSERT INTO items VALUES ({item}, 0, '{item}', '{item}', False);"
"COMMIT;")
with dbConn.execute("SELECT COUNT(ID) from items;") as cur:
for row in cur:
print("items count:", row[0])
if (row[0] > 100):
dbConn.executemany(
"BEGIN TRANSACTION;"
"DELETE FROM items;"
"COMMIT;")
with open('sd/hello.txt', 'w+') as f:
f.write(item)
await asyncio.sleep(.1)
event_test.clear()
async def watchQueue(queue):
while True:
if (queue.qsize() > 10):
event_test.set()
await asyncio.sleep(1)
async def showQueueLength(queue):
while True:
print("Queue length..." + str(queue.qsize()))
await asyncio.sleep(2)
async def showMemUsage():
while True:
print(free(True))
await asyncio.sleep(5)
def free(full=False):
F = gc.mem_free()
A = gc.mem_alloc()
T = F+A
P = '{0:.2f}%'.format(F/T*100)
if not full: return P
else : return ('Total:{0} Free:{1} ({2})'.format(T,F,P))
def InitDb(clearDb = True):
dbconn = usqlite.connect("data.db")
if clearDb:
dbconn.execute("DROP TABLE IF EXISTS items;")
dbconn.execute('''CREATE TABLE IF NOT EXISTS items
(ID INTEGER,
VERSION INTEGER NOT NULL,
NAME TEXT NOT NULL,
DESCRIPTION TEXT NOT NULL,
IS_COMPLETE BOOL NOT NULL);''')
dbconn.execute("CREATE UNIQUE INDEX index_item_name ON items(name, description);")
return dbconn
#os.rmdir('sd')
#os.mkdir('sd')
os.remove("data.db")
async def main():
queue = Queue()
dbConn = InitDb()
asyncio.create_task(produce(queue))
asyncio.create_task(consume(queue, dbConn))
asyncio.create_task(watchQueue(queue))
asyncio.create_task(showQueueLength(queue))
asyncio.create_task(showMemUsage())
print('Tasks are running...')
while True:
await asyncio.sleep(5)
asyncio.run(main())