From a599b339c77d5d9084c34c50703c53b7569c640f Mon Sep 17 00:00:00 2001 From: vizvasrj Date: Sat, 28 Dec 2024 20:57:42 +0530 Subject: [PATCH] feat: Implement core real-time interaction capabilities for Gemini API This commit introduces the fundamental features for real-time communication with the Gemini API, encompassing: - Real-time audio streaming: Enables bidirectional audio interaction using microphone input and API responses. - Real-time video streaming (camera and screen capture): Allows streaming video from either a webcam or the screen, configurable via the `MODE` environment variable. Includes image resizing and encoding for efficient transmission. - Text-based prompt functionality: Implements the ability to send text prompts to the Gemini API via the command line. - Graceful shutdown mechanism: Ensures proper cleanup of audio and video resources, including closing streams and websocket connections, upon application exit. - Initial README documentation: Provides a comprehensive guide on project setup, configuration, and usage examples for various modes. --- samples/stream_realtime/README.md | 155 +++++++ samples/stream_realtime/go.mod | 23 + samples/stream_realtime/go.sum | 44 ++ samples/stream_realtime/main.go | 723 ++++++++++++++++++++++++++++++ 4 files changed, 945 insertions(+) create mode 100644 samples/stream_realtime/README.md create mode 100644 samples/stream_realtime/go.mod create mode 100644 samples/stream_realtime/go.sum create mode 100644 samples/stream_realtime/main.go diff --git a/samples/stream_realtime/README.md b/samples/stream_realtime/README.md new file mode 100644 index 00000000..616c9cd4 --- /dev/null +++ b/samples/stream_realtime/README.md @@ -0,0 +1,155 @@ +# Gemini Client in Go + +This Go package provides a client for interacting with the Gemini **Live API**, enabling real-time multimodal interactions including text, audio, and video streaming. This client is inspired by the functionalities demonstrated in the Gemini 2.0 - Multimodal live API: Streaming, showcasing the ability to stream bidirectional audio and handle text-based conversations. + +This client allows you to build applications that can: + +* Send and receive text messages. +* Stream audio input to the Gemini API and receive audio responses. +* Stream video input (from camera or screen capture) to the Gemini API. + +## Features + +* **Real-time Communication:** Leverages WebSockets for low-latency, bidirectional communication with the Gemini Live API. +* **Text Input:** Send textual prompts to the Gemini API for conversational interactions. +* **Audio Input & Output:** + * Record audio from your microphone and stream it to the API. + * Receive and play back audio responses from the API in real-time. +* **Video Input (Camera & Screen Capture):** + * Capture frames from your camera or screen and stream them to the API for multimodal prompts. + * Supports configuration to switch between camera and screen capture modes. +* **Automatic Reconnection:** Attempts to reconnect to the API if the WebSocket connection is interrupted. +* **Configurable:** Allows for customization of the Gemini model to be used. +* **Error Handling & Logging:** Provides robust error handling and logging using `logrus`. + +## Prerequisites + +* **Go:** Version 1.18 or higher. +* **PortAudio:** Required for audio input/output. Installation instructions vary by operating system. + * **Linux (Debian/Ubuntu):** `sudo apt-get install libportaudio2 libportaudiocpp0 portaudio19-dev` +* **FFmpeg:** Required for camera capture and image conversion. + * You can usually install this using your system's package manager (e.g., `sudo apt-get install ffmpeg` on Debian/Ubuntu, `brew install ffmpeg` on macOS). +* **GEMINI_API_KEY:** You need a valid API key from Google AI Studio, which you can obtain by signing up for the Gemini API. + +## Installation + +1. **Clone the repository:** + ```bash + git clone + cd + ``` + +2. **Build the application:** + ```bash + go build -o gemini-client . + ``` + +## Configuration + +1. **Set the API Key:** + You need to set the `GEMINI_API_KEY` environment variable. You can do this by running the following command: + + ```bash + export GEMINI_API_KEY="YOUR_API_KEY" + ``` + + Replace `YOUR_API_KEY` with your actual Gemini API key. + +## Usage + +The client can be run in different modes by setting the `MODE` environment variable. This mirrors the different interaction patterns showcased in the. + +### Output Mode + +```go +setupMsg := map[string]interface{}{ + "setup": map[string]interface{}{ + "model": "models/gemini-2.0-flash-exp", + "generationConfig": map[string]interface{}{ + "responseModalities": []string{"TEXT"}, // "TEXT", "AUDIO" + }, + }, + } +``` +The client defaults to text-based interaction, similar to the basic chat demonstrated in the "Live API - Websockets Quickstart" notebook. You can type messages in the terminal and send them to the Gemini API. + +```bash +./gemini-client +``` + +You will be prompted with `message>: ` to enter your text. + +### Audio Mode + +To enable real-time bidirectional audio input and output, similar to the "Gemini 2.0 - Multimodal live API: Streaming", run the client without setting a specific responseModalities. +```go + "responseModalities": []string{"AUDIO"}, // "TEXT", "AUDIO" +``` +The client will automatically start recording audio from your microphone and playing back audio responses. + +```bash +./gemini-client +``` + +### Camera Mode + +To send camera frames to the API, enabling multimodal prompts as explored in the examples, set the `MODE` environment variable to `camera`. + +```bash +export MODE="camera" +./gemini-client +``` + +**Note:** Ensure your camera is properly configured and accessible by the system. The code currently targets `/dev/video1`. You might need to adjust this based on your camera setup. +```go +func (c *Client) openVideoCapture() (*exec.Cmd, error) { + cap := exec.Command("ffmpeg", "-f", "v4l2", "-i", "/dev/video1", "-f", "rawvideo", "-pix_fmt", "rgb24", "-vframes", "1", "-") + return cap, nil +} +``` + +### Screen Capture Mode + +To send screen captures to the API, providing visual context to your prompts, set the `MODE` environment variable to `screen`. + +```bash +export MODE="screen" +./gemini-client +``` + +The client will periodically capture your screen and send it to the API. + +## Example Interaction + +1. **Run the client (e.g., in text mode):** + ```bash + ./gemini-client + ``` +2. **You will see the prompt:** + ``` + message>: + ``` +3. **Type your message and press Enter:** + ``` + message>: What is the weather like today? + ``` +4. **The response from the Gemini API will be printed in the terminal.** + +If running in audio or video modes, the client will continuously stream audio or video data to the API, enabling more dynamic and interactive conversations. + +Refer to the Gemini API documentation for the available tools and their configurations. + +## Future Improvements + +1. **Windows Support:** Add support for Windows, including audio input/output and camera/screen capture. +2. **MacOS Support:** Add support for camera and screen capture on macOS. +3. **Tools Integrations:** Add support for Google Search, etc +4. **Function Calling**: Add support for calling functions in the Gemini API + +## Contributing + +Contributions are welcome! Please feel free to submit pull requests with improvements or bug fixes. + +## Ported from + +A Go-based implementation inspired by the functionality of [live_api_starter.ipynb](https://github.com/google-gemini/cookbook/blob/main/gemini-2/websockets/live_api_starter.ipynb) from [https://github.com/google-gemini/cookbook](https://github.com/google-gemini/cookbook). \ No newline at end of file diff --git a/samples/stream_realtime/go.mod b/samples/stream_realtime/go.mod new file mode 100644 index 00000000..ff849c25 --- /dev/null +++ b/samples/stream_realtime/go.mod @@ -0,0 +1,23 @@ +module google.golang.org/genai/samples/stream_realtime + +go 1.23.0 + +require ( + github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 + github.com/gorilla/websocket v1.5.3 + github.com/hajimehoshi/oto v1.0.1 + github.com/kbinani/screenshot v0.0.0-20240820160931-a8a2c5d0e191 + github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/gen2brain/shm v0.1.0 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/jezek/xgb v1.1.1 // indirect + github.com/lxn/win v0.0.0-20210218163916-a377121e959e // indirect + golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8 // indirect + golang.org/x/image v0.0.0-20190227222117-0694c2d4d067 // indirect + golang.org/x/mobile v0.0.0-20190415191353-3e0bab5405d6 // indirect + golang.org/x/sys v0.24.0 // indirect +) diff --git a/samples/stream_realtime/go.sum b/samples/stream_realtime/go.sum new file mode 100644 index 00000000..793d6583 --- /dev/null +++ b/samples/stream_realtime/go.sum @@ -0,0 +1,44 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gen2brain/shm v0.1.0 h1:MwPeg+zJQXN0RM9o+HqaSFypNoNEcNpeoGp0BTSx2YY= +github.com/gen2brain/shm v0.1.0/go.mod h1:UgIcVtvmOu+aCJpqJX7GOtiN7X2ct+TKLg4RTxwPIUA= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5 h1:5AlozfqaVjGYGhms2OsdUyfdJME76E6rx5MdGpjzZpc= +github.com/gordonklaus/portaudio v0.0.0-20230709114228-aafa478834f5/go.mod h1:WY8R6YKlI2ZI3UyzFk7P6yGSuS+hFwNtEzrexRyD7Es= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hajimehoshi/oto v1.0.1 h1:8AMnq0Yr2YmzaiqTg/k1Yzd6IygUGk2we9nmjgbgPn4= +github.com/hajimehoshi/oto v1.0.1/go.mod h1:wovJ8WWMfFKvP587mhHgot/MBr4DnNy9m6EepeVGnos= +github.com/jezek/xgb v1.1.1 h1:bE/r8ZZtSv7l9gk6nU0mYx51aXrvnyb44892TwSaqS4= +github.com/jezek/xgb v1.1.1/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk= +github.com/kbinani/screenshot v0.0.0-20240820160931-a8a2c5d0e191 h1:5UHVWNX1qrIbNw7OpKbxe5bHkhHRk3xRKztMjERuCsU= +github.com/kbinani/screenshot v0.0.0-20240820160931-a8a2c5d0e191/go.mod h1:Pmpz2BLf55auQZ67u3rvyI2vAQvNetkK/4zYUmpauZQ= +github.com/lxn/win v0.0.0-20210218163916-a377121e959e h1:H+t6A/QJMbhCSEH5rAuRxh+CtW96g0Or0Fxa9IKr4uc= +github.com/lxn/win v0.0.0-20210218163916-a377121e959e/go.mod h1:KxxjdtRkfNoYDCUP5ryK7XJJNTnpC8atvtmTheChOtk= +github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= +github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8 h1:idBdZTd9UioThJp8KpM/rTSinK/ChZFBE43/WtIy8zg= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067 h1:KYGJGHOQy8oSi1fDlSpcZF0+juKwk/hEMv5SiwHogR0= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/mobile v0.0.0-20190415191353-3e0bab5405d6 h1:vyLBGJPIl9ZYbcQFM2USFmJBK6KI+t+z6jL0lbwjrnc= +golang.org/x/mobile v0.0.0-20190415191353-3e0bab5405d6/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/samples/stream_realtime/main.go b/samples/stream_realtime/main.go new file mode 100644 index 00000000..ccb91b51 --- /dev/null +++ b/samples/stream_realtime/main.go @@ -0,0 +1,723 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "image" + imageColor "image/color" + "image/jpeg" + "io" + "os" + "os/exec" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/gordonklaus/portaudio" + "github.com/gorilla/websocket" + "github.com/hajimehoshi/oto" + "github.com/kbinani/screenshot" + "github.com/nfnt/resize" + "github.com/sirupsen/logrus" +) + +const ( + receiveSampleRate = 16000 // Output sample rate + sendSampleRate = 24000 // Input sample rate + chunkSize = 1024 // Input chunk size (number of int16 samples) + channel = 1 // Number of channels + turnCompleteDelay = 10 * time.Second // Delay before sending turn_complete + imageMaxWidth = 1024 + imageMaxHeight = 1024 + reconnectDelay = 2 * time.Second // Delay between reconnect attempts +) + +var APIKey string + +func getAPIKey() (string, error) { + apiKey := os.Getenv("GEMINI_API_KEY") + if apiKey == "" { + return "", fmt.Errorf("GEMINI_API_KEY environment variable not set") + } + return apiKey, nil +} + +type Client struct { + conn *websocket.Conn + model string + tools []map[string]interface{} + audioPlayer *oto.Player + ctx context.Context + cancel context.CancelFunc + stream *portaudio.Stream + turnCompleteSent bool + intBuf []int16 + videoOutQueue chan map[string]interface{} + mu sync.Mutex + audioInQueue chan []byte + reconnectMutex sync.Mutex // Mutex for reconnect logic + reconnecting bool // Flag to indicate if a reconnect is in progress + closeOnce sync.Once + closed int32 + doneChan chan struct{} + wg sync.WaitGroup + paMutex sync.Mutex + voqMutex sync.Mutex +} + +func NewClient(model string, tools []map[string]interface{}) *Client { + ctx, cancel := context.WithCancel(context.Background()) + doneChan := make(chan struct{}) + intBuf := make([]int16, chunkSize) + return &Client{ + model: model, + tools: tools, + ctx: ctx, + cancel: cancel, + turnCompleteSent: false, + intBuf: intBuf, + videoOutQueue: make(chan map[string]interface{}), + audioInQueue: make(chan []byte, 1000), + doneChan: doneChan, + wg: sync.WaitGroup{}, + } +} + +func (c *Client) Connect(apiKey string) error { + logrus.Infoln("Connecting to Gemini API...") + c.reconnectMutex.Lock() + if c.reconnecting { + c.reconnectMutex.Unlock() + return fmt.Errorf("reconnect already in progress") + } + c.reconnecting = true + c.reconnectMutex.Unlock() + + defer func() { + c.reconnectMutex.Lock() + c.reconnecting = false + c.reconnectMutex.Unlock() + }() + + url := fmt.Sprintf("wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key=%s", apiKey) + dialer := websocket.DefaultDialer + conn, _, err := dialer.DialContext(c.ctx, url, nil) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + c.conn = conn + return nil +} + +func (c *Client) Setup() error { + setupMsg := map[string]interface{}{ + "setup": map[string]interface{}{ + "model": "models/gemini-2.0-flash-exp", + "generationConfig": map[string]interface{}{ + "responseModalities": []string{"TEXT"}, + }, + }, + } + err := c.SendMessage(setupMsg) + if err != nil { + return fmt.Errorf("write setup: %w", err) + } + + _, _, err = c.conn.ReadMessage() + if err != nil { + return fmt.Errorf("read after setup: %w", err) + } + return nil +} + +func (c *Client) SendMessage(msg interface{}) error { + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + for retries := 0; retries < 3; retries++ { + c.mu.Lock() + err = c.conn.WriteMessage(websocket.TextMessage, data) + c.mu.Unlock() + if err == nil { + return nil + } + + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logrus.Infoln("Connection closed, attempting to reconnect...") + if reconnectErr := c.reconnect(); reconnectErr != nil { + return fmt.Errorf("failed to reconnect: %w", reconnectErr) + } + + continue + } + return err + } + return fmt.Errorf("failed to send message after retries: %w", err) +} + +func (c *Client) reconnect() error { + time.Sleep(reconnectDelay) + err := c.Connect(APIKey) + if err != nil { + return err + } + if err := c.Setup(); err != nil { + return err + } + return nil + +} + +// this is used to send text input +func (c *Client) SendTextInput(text string) error { + message := map[string]interface{}{ + "client_content": map[string]interface{}{ + "turns": []map[string]interface{}{ + { + "role": "user", + "parts": []map[string]interface{}{ + {"text": text}, + }, + }, + }, + "turn_complete": true, + }, + } + return c.SendMessage(message) +} + +func (c *Client) StartAudio() error { + player, err := oto.NewContext(int(sendSampleRate), channel, 2, 8192) + if err != nil { + return fmt.Errorf("failed to create audio player: %w", err) + } + c.audioPlayer = player.NewPlayer() + return nil +} + +func (c *Client) SendAudio(audioData []byte) error { + inputMsg := map[string]interface{}{ + "realtime_input": map[string]interface{}{ + "media_chunks": []map[string]interface{}{ + { + "data": base64.StdEncoding.EncodeToString(audioData), + "mime_type": "audio/pcm", + }, + }, + }, + } + + return c.SendMessage(inputMsg) +} + +func (c *Client) SendVideo(videoData map[string]interface{}) error { + inputMsg := map[string]interface{}{ + "realtime_input": map[string]interface{}{ + "media_chunks": []map[string]interface{}{videoData}, + }, + } + return c.SendMessage(inputMsg) +} + +func (c *Client) resizeImage(img *image.RGBA) *image.RGBA { + imgResized := resize.Thumbnail(imageMaxWidth, imageMaxHeight, img, resize.Lanczos3).(*image.RGBA) + return imgResized +} + +func (c *Client) encodeImageToJPEG(img *image.RGBA) ([]byte, error) { + buf := new(bytes.Buffer) + err := jpeg.Encode(buf, img, nil) + if err != nil { + return nil, fmt.Errorf("encodeImageToJPEG failed %w", err) + } + return buf.Bytes(), nil +} + +// ProcessScreenFrames reads frames from the screen and sends them to the server +// until doneChan is closed. +// and in the end subtracts the wait group +// TODO use atomic.LoadInt32(&c.closed) instead of defalt select +func (c *Client) processScreenFrames() error { + for { + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed processScreenFrames") + return nil + default: + frame, err := c._getScreen() + if err != nil { + logrus.Errorln("Failed to process screen frame:", err) + continue + } + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed processScreenFrames inside select") + return nil + case c.videoOutQueue <- frame: + time.Sleep(1 * time.Second) + default: + time.Sleep(10 * time.Millisecond) + } + } + } +} + +// _getScreen captures the screen and returns the image as a map +func (c *Client) _getScreen() (map[string]interface{}, error) { + n := screenshot.NumActiveDisplays() + if n <= 0 { + return nil, fmt.Errorf("no monitors detected") + } + bounds := screenshot.GetDisplayBounds(0) + img, err := screenshot.CaptureRect(bounds) + if err != nil { + return nil, fmt.Errorf("failed to capture screen: %w", err) + } + imgResized := c.resizeImage(img) + imageBytes, err := c.encodeImageToJPEG(imgResized) + if err != nil { + return nil, fmt.Errorf("failed to encode image to JPEG: %w", err) + } + + return map[string]interface{}{ + "mime_type": "image/jpeg", + "data": base64.StdEncoding.EncodeToString(imageBytes), + }, nil +} + +// _getFrame captures a frame from the camera and returns the image as a map +func (c *Client) _getFrame() (map[string]interface{}, error) { + cap, stderr, err := c.openVideoCapture() + if err != nil { + return nil, fmt.Errorf("failed to open video capture %w", err) + } + defer func() { + if cap.Process != nil { + cap.Process.Release() + } + }() + data, err := cap.Output() + if err != nil { + return nil, fmt.Errorf("failed to read frame %w, %s", err, stderr.String()) + } + frame_rgb := c.convertToRGB(data) + img, err := c.createImageFromBytes(frame_rgb) + if err != nil { + return nil, fmt.Errorf("failed to create image from bytes %w", err) + } + img = c.resizeImage(img) + imageBytes, err := c.encodeImageToJPEG(img) + if err != nil { + return nil, fmt.Errorf("failed to encode image to JPEG %w", err) + } + return map[string]interface{}{ + "mime_type": "image/jpeg", + "data": base64.StdEncoding.EncodeToString(imageBytes), + }, nil +} + +func (c *Client) openVideoCapture() (*exec.Cmd, *bytes.Buffer, error) { + cap := exec.Command("ffmpeg", "-f", "v4l2", "-i", "/dev/video1", "-f", "rawvideo", "-pix_fmt", "rgb24", "-vframes", "1", "-") + var stderr bytes.Buffer + cap.Stderr = &stderr + return cap, &stderr, nil +} + +func (c *Client) convertToRGB(frame []byte) []byte { + var out bytes.Buffer + cmd := exec.Command("ffmpeg", "-f", "rawvideo", "-pix_fmt", "bgr24", "-video_size", "640x480", "-i", "-", "-f", "rawvideo", "-pix_fmt", "rgb24", "-") + cmd.Stdin = bytes.NewReader(frame) + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + logrus.Errorln("Error converting image to rgb:", err) + } + return out.Bytes() +} + +func (c *Client) createImageFromBytes(frame_rgb []byte) (*image.RGBA, error) { + img := image.NewRGBA(image.Rect(0, 0, 640, 480)) + for i := 0; i < len(frame_rgb); i += 3 { + x := (i / 3) % 640 + y := (i / 3) / 640 + img.Set(x, y, imageColor.RGBA{R: frame_rgb[i], G: frame_rgb[i+1], B: frame_rgb[i+2], A: 0xff}) + } + return img, nil +} + +func (c *Client) StartRecording() error { + portaudio.Initialize() + stream, err := portaudio.OpenDefaultStream(channel, 0, float64(sendSampleRate), len(c.intBuf), c.intBuf) + if err != nil { + return fmt.Errorf("failed to open default stream: %w", err) + } + c.stream = stream + if err := c.stream.Start(); err != nil { + return fmt.Errorf("failed to start recording: %w", err) + } + return nil +} + +// ProcessCameraFrames reads frames from the camera and sends them to the server +// until doneChan is closed. +// and in the end subtracts the wait group +// TODO use atomic.LoadInt32(&c.closed) instead of defalt select +func (c *Client) processCameraFrames() error { + for { + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed processCameraFrames") + return nil + default: + frame, err := c._getFrame() + if err != nil { + logrus.Errorln("Failed to process camera frame:", err) + return err + } + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed processCameraFrames inside select") + return nil + case c.videoOutQueue <- frame: + time.Sleep(1 * time.Second) + default: + time.Sleep(10 * time.Millisecond) + } + } + } +} + +// ProcessAudioInput reads audio from the microphone and sends it to the server +// until doneChan is closed. +// and in the end subtracts the wait group +func (c *Client) ProcessAudioInput() error { + for { + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed ProcessAudioInput") + return nil + default: + if atomic.LoadInt32(&c.closed) == 1 { + logrus.Debugln("Client is closing, exiting ProcessAudioInput") + return nil + } + c.paMutex.Lock() + err := c.stream.Read() + c.paMutex.Unlock() + + if err != nil { + if err == io.EOF { + return nil + } else if portaudioErr, ok := err.(portaudio.UnanticipatedHostError); ok { + if portaudioErr.Code == 0 { + logrus.Debugln("channel closed") + return nil + } + } + return fmt.Errorf("failed to read audio: %w", err) + } + if err := c.SendAudio(c.int16ToLittleEndianByte(c.intBuf)); err != nil { + // return fmt.Errorf("failed to send audio: %w", err) + // I am not returning here because if if there is error to send to websocket + // I will return and stop the input microphone processing + // so i will just log the error and continue + logrus.Infof("failed to send audio: %v", err) + continue + } + } + } +} + +func (c *Client) int16ToLittleEndianByte(f []int16) []byte { + var buf bytes.Buffer + err := binary.Write(&buf, binary.LittleEndian, f) + if err != nil { + logrus.Errorf("binary.Write failed. Err %v\n", err) + } + return buf.Bytes() +} + +// ProcessAudio reads audio from the server and plays it until doneChan is closed. +// and in the end subtracts the wait group +func (c *Client) ProcessAudio() error { + time.Sleep(1 * time.Second) + + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseInternalServerErr) { + if reconnectErr := c.reconnect(); reconnectErr != nil { + return fmt.Errorf("failed to reconnect: %w", reconnectErr) + } + continue + } + return fmt.Errorf("failed to read message: %w", err) + } + + var response map[string]interface{} + if err := json.Unmarshal(message, &response); err != nil { + logrus.Errorln("Error unmarshaling:", err) + continue + } + + serverContent, ok := response["serverContent"].(map[string]interface{}) + if !ok { + logrus.Debugln("No serverContent found in response:", response) + continue + } + + modelTurn, ok := serverContent["modelTurn"].(map[string]interface{}) + if !ok { + turnComplete, ok := serverContent["turnComplete"].(bool) + if ok && turnComplete { + for len(c.audioInQueue) > 0 { + <-c.audioInQueue + } + continue + } + logrus.Debugln("No modelTurn found in response:", serverContent) + continue + } + parts, ok := modelTurn["parts"].([]interface{}) + if !ok || len(parts) == 0 { + logrus.Debugln("No parts found in modelTurn:", modelTurn) + continue + } + + for _, part := range parts { + partMap, ok := part.(map[string]interface{}) + if !ok { + continue + } + if text, ok := partMap["text"].(string); ok { + fmt.Print(text) + } else if inlineData, ok := partMap["inlineData"].(map[string]interface{}); ok { + if audioData, ok := inlineData["data"].(string); ok { + decoded, err := base64.StdEncoding.DecodeString(audioData) + if err != nil { + logrus.Errorln("Error decoding audio:", err) + continue + } + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed ProcessAudio") + return nil + case c.audioInQueue <- decoded: + default: + } + } + } + } + fmt.Println() + } +} + +// ProcessAudioOutput reads audio from the `audioInQueue` and plays it until doneChan is closed. +// and in the end subtracts the wait group +func (c *Client) ProcessAudioOutput() { + for { + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed ProcessAudioOutput") + return + case audio := <-c.audioInQueue: + // time.Sleep(100 * time.Millisecond) + if _, err := c.audioPlayer.Write(audio); err != nil { + logrus.Errorln("Error playing audio:", err) + } + } + } +} + +// ProcessVideoSend reads video from the `videoOutQueue` and sends it to the server until doneChan is closed. +// and in the end subtracts the wait group +func (c *Client) ProcessVideoSend() { + for { + select { + case <-c.doneChan: + logrus.Debugln("doneChan closed ProcessVideoSend") + return + case video := <-c.videoOutQueue: + if err := c.SendVideo(video); err != nil { + logrus.Errorln("Error sending video:", err) + } + } + + } +} + +func (c *Client) Close() { + if atomic.CompareAndSwapInt32(&c.closed, 0, 1) { + c.closeOnce.Do(func() { + + close(c.doneChan) + + close(c.audioInQueue) + c.cancel() + if c.conn != nil { + err := c.conn.Close() + if err != nil { + logrus.Errorln("Error closing connection:", err) + } + } + close(c.videoOutQueue) + c.paMutex.Lock() + if c.stream != nil { + err := c.stream.Stop() + if err != nil { + logrus.Errorln("Error stopping stream:", err) + } + err = c.stream.Close() + if err != nil { + logrus.Errorln("Error closing stream:", err) + } + } + err := portaudio.Terminate() + c.paMutex.Unlock() + if err != nil { + logrus.Errorln("Error terminating portaudio:", err) + } + // c.audioPlayer.Close() + }) + } + +} + +func main() { + logrus.SetLevel(logrus.DebugLevel) + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + }) + logrus.SetReportCaller(true) + logrus.Infoln("Starting Gemini client") + + var err error + APIKey, err = getAPIKey() + if err != nil { + logrus.Fatalln("Error retrieving API key:", err) + } + tools := []map[string]interface{}{ + // {"google_search": map[string]interface{}{}}, + } + client := NewClient("models/gemini-2.0-flash-exp", tools) + + if err := client.Connect(APIKey); err != nil { + logrus.Fatalln("Connection error:", err) + } + if err := client.Setup(); err != nil { + logrus.Fatalln("Setup error:", err) + } + if err := client.StartAudio(); err != nil { + logrus.Fatalln("Error starting audio:", err) + } + if err := client.StartRecording(); err != nil { + logrus.Fatalln("Error starting microphone:", err) + } + client.wg.Add(1) + go func() { + defer func() { + client.wg.Done() + logrus.Debugln("done with ProcessAudioInput") + }() + if err := client.ProcessAudioInput(); err != nil { + logrus.Errorln("Error processing audio input:", err) + } + }() + + client.wg.Add(1) + go func() { + defer func() { + client.wg.Done() + logrus.Debugln("done with ProcessAudio") + }() + if err := client.ProcessAudio(); err != nil { + logrus.Errorln("Audio processing error:", err) + client.Close() + } + }() + + client.wg.Add(1) + go func() { + defer func() { + client.wg.Done() + logrus.Debugln("done with ProcessAudioOutput") + }() + + client.ProcessAudioOutput() + }() + + client.wg.Add(1) + go func() { + defer func() { + client.wg.Done() + logrus.Debugln("done with ProcessVideoSend") + }() + client.ProcessVideoSend() + }() + + client.wg.Add(1) + go func() { + defer func() { + client.wg.Done() + logrus.Debugln("done with Process Camera | Frames") + }() + if os.Getenv("MODE") == "" { + logrus.Infoln("No mode specified") + } else if os.Getenv("MODE") == "camera" { + logrus.Infoln("camera is selected") + if err := client.processCameraFrames(); err != nil { + logrus.Errorln("Camera processing error:", err) + client.Close() + } + } else if os.Getenv("MODE") == "screen" { + logrus.Infoln("screen is selected") + time.Sleep(1 * time.Second) + if err := client.processScreenFrames(); err != nil { + logrus.Errorln("Screen processing error:", err) + } + } + }() + + go func() { + reader := bufio.NewReader(os.Stdin) + for { + fmt.Printf("message>: ") + input, err := reader.ReadString('\n') + if err != nil { + logrus.Errorln("Error reading input:", err) + return + } + + input = strings.TrimSpace(input) + if err := client.SendTextInput(input); err != nil { + logrus.Errorln("Error sending input:", err) + return + } + } + }() + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) + go func() { + <-interrupt + fmt.Println("Intentent Exiting...") + client.Close() + }() + client.wg.Wait() + err = client.audioPlayer.Close() + if err != nil { + logrus.Errorln("Error closing audio player:", err) + } + + time.Sleep(2 * time.Second) +}