Skip to content
24 changes: 0 additions & 24 deletions .github/workflows/pypi.yml

This file was deleted.

28 changes: 12 additions & 16 deletions asynch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ def __init__(
cursor_cls=Cursor,
echo: bool = False,
stack_track: bool = False,
pre_ping: bool = True,
**kwargs,
):
if dsn:
config = parse_dsn(dsn)
self._connection = ProtoConnection(**config, stack_track=stack_track, **kwargs)
self._connection = ProtoConnection(
**config, stack_track=stack_track, pre_ping=pre_ping, **kwargs
)
user = config.get("user", None) or user
password = config.get("password", None) or password
host = config.get("host", None) or host
Expand Down Expand Up @@ -178,32 +181,25 @@ async def ping(self) -> None:
msg = f"Ping has failed for {self}"
raise ConnectionError(msg)

async def _refresh(self) -> None:
"""Refresh the connection.
async def is_live(self) -> bool:
"""Checks if the connection is live.

Attempting to ping and if failed,
then trying to connect again.
If the reconnection does not work,
an Exception is propagated.
Attempts to ping and returns True if successful.

:raises ConnectionError:
1. refreshing created, i.e., not opened connection
2. refreshing already closed connection

:return: None
:return: True if the connection is alive, otherwise False.
"""

if self.status == ConnectionStatus.created:
msg = f"the {self} is not opened to be refreshed"
raise ConnectionError(msg)
if self.status == ConnectionStatus.closed:
msg = f"the {self} is already closed"
raise ConnectionError(msg)
if self.status == ConnectionStatus.created or self.status == ConnectionStatus.closed:
return False

try:
await self.ping()
return True
except ConnectionError:
await self.connect()
return False

async def rollback(self):
raise NotSupportedError
3 changes: 3 additions & 0 deletions asynch/cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ def _prepare(self, context=None):
"query_id": self._query_id,
}

if "columnar" in execution_options:
execute_kwargs["columnar"] = execution_options.get("columnar", False)

return execute, execute_kwargs

def __aiter__(self):
Expand Down
38 changes: 18 additions & 20 deletions asynch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections import deque
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager, suppress
from contextlib import asynccontextmanager
from typing import Optional

from asynch.connection import Connection
Expand Down Expand Up @@ -132,21 +132,19 @@ def maxsize(self) -> int:
def minsize(self) -> int:
return self._minsize

async def _create_connection(self) -> None:
async def _create_connection(self) -> Connection:
if self._pool_size == self._maxsize:
raise AsynchPoolError(f"{self} is already full")
if self._pool_size > self._maxsize:
raise AsynchPoolError(f"{self} is overburden")

conn = Connection(**self._connection_kwargs)
conn = Connection(pre_ping=False, **self._connection_kwargs)
await conn.connect()
return conn

try:
await conn.ping()
self._free_connections.append(conn)
except ConnectionError as e:
msg = f"failed to create a {conn} for {self}"
raise AsynchPoolError(msg) from e
async def _create_and_release_connection(self) -> None:
conn = await self._create_connection()
self._free_connections.append(conn)

def _pop_connection(self) -> Connection:
if not self._free_connections:
Expand All @@ -156,8 +154,8 @@ def _pop_connection(self) -> Connection:
async def _get_fresh_connection(self) -> Optional[Connection]:
while self._free_connections:
conn = self._pop_connection()
with suppress(ConnectionError):
await conn._refresh()
logger.debug(f"Testing connection {conn}")
if await conn.is_live():
return conn
return None

Expand All @@ -166,22 +164,17 @@ async def _acquire_connection(self) -> Connection:
self._acquired_connections.append(conn)
return conn

await self._create_connection()
conn = self._pop_connection()
logger.debug("No free connection in pool. Creating new connection.")
conn = await self._create_connection()
self._acquired_connections.append(conn)
return conn

async def _release_connection(self, conn: Connection) -> None:
if conn not in self._acquired_connections:
raise AsynchPoolError(f"the connection {conn} does not belong to {self}")

logger.debug(f"Releasing connection {conn}")
self._acquired_connections.remove(conn)
try:
await conn._refresh()
except ConnectionError as e:
msg = f"the {conn} is invalidated"
raise AsynchPoolError(msg) from e

self._free_connections.append(conn)

async def _init_connections(self, n: int, *, strict: bool = False) -> None:
Expand All @@ -199,7 +192,7 @@ async def _init_connections(self, n: int, *, strict: bool = False) -> None:

# it is possible that the `_create_connection` may not create `n` connections
tasks: list[asyncio.Task] = [
asyncio.create_task(self._create_connection()) for _ in range(n)
asyncio.create_task(self._create_and_release_connection()) for _ in range(n)
]
# that is why possible exceptions from the `_create_connection` are also gathered
if strict and any(
Expand All @@ -226,10 +219,15 @@ async def connection(self) -> AsyncIterator[Connection]:
:return: a free connection from the pool
:rtype: Connection
"""
logger.debug(
f"Acquiring connection from Pool ({len(self._free_connections)} free connections, {len(self._acquired_connections)} acquired connections)"
)

async with self._sem:
async with self._lock:
conn = await self._acquire_connection()
logger.debug(f"Acquired connection {conn}")

try:
yield conn
finally:
Expand Down
20 changes: 9 additions & 11 deletions asynch/proto/columns/arraycolumn.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import deque
from itertools import chain
from queue import Queue
from struct import Struct

from .base import Column
Expand Down Expand Up @@ -82,15 +82,14 @@ async def _write_sizes(
self,
value,
):
q = Queue()
q.put((self, value, 0))
q = deque([(self, value, 0)])

cur_depth = 0
offset = 0
nulls_map = []

while not q.empty():
column, value, depth = q.get_nowait()
while q:
column, value, depth = q.popleft()

if cur_depth != depth:
cur_depth = depth
Expand All @@ -112,7 +111,7 @@ async def _write_sizes(
nested_column = column.nested_column
if isinstance(nested_column, ArrayColumn):
for x in value:
q.put((nested_column, x, cur_depth + 1))
q.append((nested_column, x, cur_depth + 1))
nulls_map.append(None if x is None else False)

async def _write_data(
Expand Down Expand Up @@ -176,8 +175,7 @@ async def _read(
self,
size,
):
q = Queue()
q.put((self, size, 0))
q = deque([(self, size, 0)])

slices_series = []

Expand All @@ -196,8 +194,8 @@ async def _read(
nested_column = self.nested_column

# Read and store info about slices.
while not q.empty():
column, size, depth = q.get_nowait()
while q:
column, size, depth = q.popleft()

nested_column = column.nested_column

Expand All @@ -220,7 +218,7 @@ async def _read(
for _i in range(size):
offset = await self.size_unpack()
nested_column_size = offset
q.put((nested_column, offset - prev_offset, cur_depth + 1))
q.append((nested_column, offset - prev_offset, cur_depth + 1))
slices.append((prev_offset, offset))
prev_offset = offset

Expand Down
59 changes: 43 additions & 16 deletions asynch/proto/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from urllib.parse import urlparse

from asynch.errors import (
OperationalError,
PartiallyConsumedQueryError,
ServerException,
UnexpectedPacketFromServerError,
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__( # nosec:B107
alt_hosts: str = None,
stack_track=False,
settings_is_important=False,
pre_ping: bool = True,
**kwargs,
):
self.stack_track = stack_track
Expand All @@ -119,6 +121,7 @@ def __init__( # nosec:B107
self._lock = asyncio.Lock()
self.secure_socket = secure
self.verify = verify
self.pre_ping = pre_ping

ssl_options = {}
if ssl_version is not None:
Expand Down Expand Up @@ -320,10 +323,12 @@ async def ping(self) -> bool:
msg = self.unexpected_packet_message("Pong", packet_type)
raise UnexpectedPacketFromServerError(msg)
return True
except AttributeError:
logger.debug("The connection %s is not open", self)
except OperationalError as e:
logger.info("The connection %s is not open", self, exc_info=e)
except AttributeError as e:
logger.info("The connection %s is not open", self, exc_info=e)
except IndexError as e:
logger.debug(
logger.info(
"Ping package smaller than expected or empty. "
"There may be connection or network problems - "
"we believe that the connection is incorrect.",
Expand All @@ -334,7 +339,7 @@ async def ping(self) -> bool:
# because this is a connection loss case
if isinstance(e, RuntimeError) and "TCPTransport closed=True" not in str(e):
raise e
logger.debug("Socket closed", exc_info=e)
logger.info("Socket closed", exc_info=e)
return False

async def receive_data(self, raw=False):
Expand Down Expand Up @@ -442,6 +447,7 @@ async def _receive_packet(self):

elif packet_type == ServerPacket.EXCEPTION:
packet.exception = await self.receive_exception()
self.is_query_executing = False

elif packet.type == ServerPacket.PROGRESS:
packet.progress = await self.receive_progress()
Expand Down Expand Up @@ -577,9 +583,10 @@ async def disconnect(self):
async def connect(self):
if self.connected:
await self.disconnect()
logger.debug("Connecting. Database: %s. User: %s", self.database, self.user)
for host, port in self.hosts:
logger.debug("Connecting to %s:%s", host, port)
logger.debug(
"Connecting to %s:%s Database: %s. User: %s", host, port, self.database, self.user
)
return await self._init_connection(host, port)

async def execute(
Expand Down Expand Up @@ -758,9 +765,10 @@ async def force_connect(self):
if not self.connected:
await self.connect()

elif not await self.ping():
logger.info("Connection was closed, reconnecting.")
await self.connect()
elif self.pre_ping:
if not await self.ping():
logger.info("Connection was closed, reconnecting.")
await self.connect()

async def process_ordinary_query(
self,
Expand Down Expand Up @@ -820,30 +828,49 @@ async def process_insert_query(
await self.send_query(query_without_data, query_id=query_id)
await self.send_external_tables(external_tables, types_check=types_check)

sample_block = await self.receive_sample_block()
sample_block = await self._receive_sample_block()
if sample_block:
rv = await self.send_data(
sample_block, data, types_check=types_check, columnar=columnar
)
packet = await self._receive_packet()
if packet.exception:
raise packet.exception
await self._receive_end_of_stream()

return rv

async def receive_sample_block(self):
async def _receive_sample_block(self):
while True:
packet = await self._receive_packet()

if packet.type == ServerPacket.DATA:
return packet.block

elif packet.type == ServerPacket.EXCEPTION:
raise packet.exception
elif packet.type == ServerPacket.LOG:
self.log_block(packet.block)
pass
elif packet.type == ServerPacket.TABLE_COLUMNS:
pass
else:
message = self.unexpected_packet_message(
"Data, Exception or TableColumns", packet.type
)
raise UnexpectedPacketFromServerError(message)

async def _receive_end_of_stream(self):
while True:
packet = await self._receive_packet()

if packet.type == ServerPacket.END_OF_STREAM:
return
elif packet.type == ServerPacket.EXCEPTION:
raise packet.exception
elif packet.type == ServerPacket.LOG:
pass
elif packet.type == ServerPacket.PROFILE_INFO:
pass
elif packet.type == ServerPacket.PROFILE_EVENTS:
pass
elif packet.type == ServerPacket.PROGRESS:
pass
else:
message = self.unexpected_packet_message(
"Data, Exception or TableColumns", packet.type
Expand Down
Loading