Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions lib/pynput/keyboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
# pylint: disable=C0103
# KeyCode, Key, Controller and Listener are not constants

import asyncio
import itertools
import sys
from functools import cached_property, wraps

from pynput._util import backend, Events

Expand Down Expand Up @@ -247,3 +250,61 @@ def _on_release(self, key, injected):
if not injected:
for hotkey in self._hotkeys:
hotkey.release(self.canonical(key))


# Prefer the encoraged usage of running_loop, but be compatible with older python.
# In the latest python this cannot be instantiated until the loop is surely running.
if sys.version_info >= (3, 7):
_get_loop = asyncio.get_running_loop
else:
_get_loop = asyncio.get_event_loop

class AsyncListener:
"""Run keyboard listener from an async loop.

After catching the keys pressed in the thread, it transmits the tasks to be
processed in the event loop.
"""
def __init__(self, on_press=None, on_release=None, suppress=False, **kwargs):
self._loop = None
self.running = False

self._listener = Listener(
on_press=self._transmit_to_event_loop(on_press),
on_release=self._transmit_to_event_loop(on_release),
suppress=suppress,
**kwargs
)

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()

def _transmit_to_event_loop(self, func):
@wraps(func)
def wrapper(key):
return self._loop.call_soon_threadsafe(func, key)

@wraps(func)
def async_wrapper(key):
return asyncio.run_coroutine_threadsafe(func(key), self._loop)

if func is None:
return None
elif asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return wrapper

def start(self):
self._loop = _get_loop()
self._listener.start()
self.running = True

def stop(self):
self._listener.stop()
self._listener.join()
self.running = False