Skip to content

Python SDK Para o WAHA? #1

@WittmannF

Description

@WittmannF

Olá, boa tarde!

Obrigado pelo seu projeto! Te encontrei no Github pesquisando por projetos recentes utilizando o Waha. Percebi que há vários projetos, no entanto cada um criando um cliente para o Waha em Python de sua maneira. Gostaria de criar um grupo para criarmos um projeto/código aberto para todos utilizarmos como SDK para fácil acesso, atualização e que possamos instalar via pip. Se possuir interesse, peço que entre nesse grupo para discutirmos:

Algumas das dores que venho percebendo e que teria interesse no SDK:

  • Parsing seguro do payload de entrada
    • Talvez criar um modelo do pydantic. Em geral é tranquilo, mas com o tempo há atualizações constantes
  • Um cliente que seja estável, seguro e atualizado
    • Também, como há atualizações e depreciações constantes na API, gostaria que fossem automaticamente propagadas em um SDK.

Obrigado desde já! Aproveito e já compartilho aqui como o meu cliente ficou (no qual pretendo atualizar para async):

from urllib.parse import quote

import requests

from app.config.settings import get_settings

settings = get_settings()


class BaseClient:
    def __init__(self, base_url):
        # Normalize to avoid double slashes when concatenating endpoints.
        self.base_url = (base_url or "").rstrip("/")
        # Default headers for all WAHA API requests
        self.headers = {
            "Accept": "application/json",
            "Content-Type": "application/json",
            "X-Api-Key": settings.WAHA_API_KEY,
        }

    def _request(self, method, endpoint, **kwargs):
        url = f"{self.base_url}{endpoint}"
        # Merge default headers with any headers passed explicitly
        extra_headers = kwargs.pop("headers", {}) or {}
        merged_headers = {**self.headers, **extra_headers}
        response = requests.request(method, url, headers=merged_headers, **kwargs)

        # Let raise_for_status() handle HTTP errors
        response.raise_for_status()

        # Handle different response scenarios
        if not response.content:
            # Empty response (e.g., 204 No Content)
            return {}

        # Try to parse as JSON first
        try:
            return response.json()
        except ValueError:
            # Not JSON - return text content if possible
            try:
                return response.text
            except UnicodeDecodeError:
                # Fallback to raw bytes if text decoding fails
                return response.content


class SessionAPI(BaseClient):
    def list_sessions(self):
        """List all WhatsApp sessions"""
        return self._request("GET", "/api/sessions")

    def create_session(self, session_name):
        """Create a new WhatsApp session"""
        data = {"session": session_name}
        return self._request("POST", "/api/sessions", json=data)

    def get_session(self, session_name):
        """Get information about a specific WhatsApp session"""
        return self._request("GET", f"/api/sessions/{session_name}")

    def update_session(self, session_name, data):
        """Update a specific WhatsApp session"""
        return self._request("PUT", f"/api/sessions/{session_name}", json=data)

    def delete_session(self, session_name):
        """Delete a WhatsApp session"""
        return self._request("DELETE", f"/api/sessions/{session_name}")

    def get_authenticated_account(self, session_name):
        """Get information about the authenticated WhatsApp account"""
        return self._request("GET", f"/api/sessions/{session_name}/me")

    def start_session(self, session_name):
        """Start a WhatsApp session"""
        return self._request("POST", f"/api/sessions/{session_name}/start")

    def stop_session(self, session_name):
        """Stop a WhatsApp session"""
        return self._request("POST", f"/api/sessions/{session_name}/stop")

    def logout_session(self, session_name):
        """Logout from a WhatsApp session"""
        return self._request("POST", f"/api/sessions/{session_name}/logout")

    def restart_session(self, session_name):
        """Restart a WhatsApp session"""
        return self._request("POST", f"/api/sessions/{session_name}/restart")

    def upsert_and_start_session(self, data):
        """Upsert and start a WhatsApp session"""
        return self._request("POST", "/api/sessions/start", json=data)

    def stop_all_sessions(self, data=None):
        """Stop all WhatsApp sessions, logout if requested"""
        return self._request("POST", "/api/sessions/stop", json=data)

    def logout_and_delete_all_sessions(self, data=None):
        """Logout and delete all WhatsApp sessions"""
        return self._request("POST", "/api/sessions/logout", json=data)


class ChattingAPI(BaseClient):
    def send_text(self, data):
        """Send a text message"""
        return self._request("POST", "/api/sendText", json=data)

    def send_image(self, data):
        """Send an image"""
        return self._request("POST", "/api/sendImage", json=data)

    def send_file(self, data):
        """Send a file"""
        return self._request("POST", "/api/sendFile", json=data)

    def send_voice(self, data):
        """Send a voice message"""
        return self._request("POST", "/api/sendVoice", json=data)

    def send_video(self, data):
        """Send a video"""
        return self._request("POST", "/api/sendVideo", json=data)

    def send_buttons(self, data):
        """Send buttons (interactive message)"""
        return self._request("POST", "/api/sendButtons", json=data)

    def forward_message(self, data):
        """Forward a message"""
        return self._request("POST", "/api/forwardMessage", json=data)

    def send_seen(self, data):
        """Send seen status"""
        return self._request("POST", "/api/sendSeen", json=data)

    def start_typing(self, data):
        """Start typing indicator"""
        return self._request("POST", "/api/startTyping", json=data)

    def stop_typing(self, data):
        """Stop typing indicator"""
        return self._request("POST", "/api/stopTyping", json=data)

    def react_to_message(self, data):
        """React to a message with an emoji"""
        return self._request("PUT", "/api/reaction", json=data)

    def star_message(self, data):
        """Star or unstar a message"""
        return self._request("PUT", "/api/star", json=data)

    def send_poll(self, data):
        """Send a poll with options"""
        return self._request("POST", "/api/sendPoll", json=data)

    def send_location(self, data):
        """Send a location"""
        return self._request("POST", "/api/sendLocation", json=data)

    def send_link_preview(self, data):
        """Send a link preview"""
        return self._request("POST", "/api/sendLinkPreview", json=data)

    def get_messages(self, params=None):
        """
        Get messages in a chat.

        WAHA deprecated `GET /api/messages`. Use:
          GET /api/{session}/chats/{chatId}/messages
        """
        params = params or {}
        chat_id = params.get("chatId")
        session = params.get("session", "default")

        # If caller didn't provide a chatId, fall back to the old endpoint.
        # (Kept for backward compatibility with any external callers.)
        if not chat_id:
            return self._request("GET", "/api/messages", params=params)

        # chatId is part of the path and must be URL-encoded (e.g. "@g.us").
        encoded_chat_id = quote(str(chat_id), safe="")
        endpoint = f"/api/{session}/chats/{encoded_chat_id}/messages"

        # Translate old query params to the new endpoint's query.
        query = {
            "limit": params.get("limit", 10),
            "downloadMedia": params.get("downloadMedia", False),
        }
        if params.get("sortBy") is not None:
            query["sortBy"] = params["sortBy"]
        if params.get("sortOrder") is not None:
            query["sortOrder"] = params["sortOrder"]
        if params.get("offset") is not None:
            query["offset"] = params["offset"]
        if params.get("filter.timestamp.lte") is not None:
            query["filter.timestamp.lte"] = params["filter.timestamp.lte"]
        if params.get("filter.timestamp.gte") is not None:
            query["filter.timestamp.gte"] = params["filter.timestamp.gte"]
        if params.get("filter.fromMe") is not None:
            query["filter.fromMe"] = params["filter.fromMe"]
        if params.get("filter.ack") is not None:
            query["filter.ack"] = params["filter.ack"]

        return self._request("GET", endpoint, params=query)

    def send_contact_vcard(self, data):
        """Send a contact vCard"""
        return self._request("POST", "/api/sendContactVcard", json=data)

    def check_number_status(self, params=None):
        """Check number status"""
        return self._request("GET", "/api/checkNumberStatus", params=params)

    def reply(self, data):
        """DEPRECATED - you can set "reply_to" field when sending text, image, etc"""
        return self._request("POST", "/api/reply", json=data)


class WAHAClient:
    def __init__(self, base_url=None, default_number=settings.DEBUG_NUMBER):
        base_url = (
            base_url or settings.DEBUG_SERVER_IP
            if settings.DEBUG
            else settings.DEFAULT_SERVER_IP
        )
        self.session = SessionAPI(base_url)
        self.chatting = ChattingAPI(base_url)
        self.default_number = default_number

    def _get_chat_id(self, number=None, chat_id=None):
        """Private method to get the chat ID."""
        number = number or self.default_number
        return chat_id or f"{number}@c.us"

    def send_text(self, message, number=None, chat_id=None, max_characters=65000):
        """Send a text message in chunks if it exceeds max_characters."""
        chat_id = self._get_chat_id(number, chat_id)

        # Split the message into chunks if it exceeds max_characters
        if len(message) > max_characters:
            message_chunks = message.split("\n")
            current_chunk = ""
            for chunk in message_chunks:
                if len(current_chunk) + len(chunk) + 1 > max_characters:
                    # Send the current chunk
                    data = {
                        "chatId": chat_id,
                        "text": current_chunk.strip(),
                        "session": "default",
                    }
                    self.chatting.send_text(data)
                    current_chunk = chunk
                else:
                    current_chunk += "\n" + chunk
            # Send any remaining text
            if current_chunk:
                data = {
                    "chatId": chat_id,
                    "text": current_chunk.strip(),
                    "session": "default",
                }
                self.chatting.send_text(data)
        else:
            # Send the message as a single chunk
            data = {"chatId": chat_id, "text": message, "session": "default"}
            self.chatting.send_text(data)

    def get_previous_messages(self, number=None, chat_id=None, n=10) -> list:
        """Retrieve the previous n messages from a chat"""
        chat_id = self._get_chat_id(number, chat_id)
        # Use the new WAHA endpoint via ChattingAPI.get_messages.
        params = {
            "chatId": chat_id,
            "limit": n,
            "session": "default",
            "sortBy": "timestamp",
            "downloadMedia": False,
        }
        return self.chatting.get_messages(params=params)

    def download_media(self, media_url: str, headers: dict | None = None) -> bytes:
        """
        Download media (audio, images, files, etc.) from a WAHA media URL.

        This centralizes media retrieval so that WAHA authentication (X-Api-Key)
        is always applied consistently.
        """
        # Start with the same default WAHA headers used for API calls
        # but allow callers to override/extend them when needed.
        base_headers = dict(self.session.headers)

        # For binary media we don't really need Content-Type on the request side.
        # Keep it if present, but callers can override via `headers`.
        merged_headers = {**base_headers, **(headers or {})}

        response = requests.get(media_url, headers=merged_headers)
        response.raise_for_status()
        return response.content


# ---- Singleton helper (share WAHA client instance across the app) ----
_WAHA_CLIENTS: dict[str, "WAHAClient"] = {}


def get_waha_client(base_url: str | None = None) -> WAHAClient:
    """
    Return a shared WAHAClient instance for the given base_url.

    This avoids creating new clients repeatedly (useful for tool calls vs. webhook path)
    and ensures consistent configuration (headers/base URL).
    """
    base_url = (base_url or settings.SERVER_IP or "").rstrip("/")
    if base_url not in _WAHA_CLIENTS:
        _WAHA_CLIENTS[base_url] = WAHAClient(base_url)
    return _WAHA_CLIENTS[base_url]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions