diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..250c385
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.idea
+.vendor
\ No newline at end of file
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..13566b8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/Go.iml b/.idea/Go.iml
new file mode 100644
index 0000000..5e764c4
--- /dev/null
+++ b/.idea/Go.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..39d35e2
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/0-limit-crawler/check_test.go b/0-limit-crawler/check_test.go
index 8451bc0..cc1f178 100644
--- a/0-limit-crawler/check_test.go
+++ b/0-limit-crawler/check_test.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
diff --git a/0-limit-crawler/main.go b/0-limit-crawler/main.go
index ddadd14..0b0832a 100644
--- a/0-limit-crawler/main.go
+++ b/0-limit-crawler/main.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// Your task is to change the code to limit the crawler to at most one
-// page per second, while maintaining concurrency (in other words,
+// page per second, while maintaining 6-cancellation (in other words,
// Crawl() must be called concurrently)
//
// @hint: you can achieve this by adding 3 lines
diff --git a/0-limit-crawler/mockfetcher.go b/0-limit-crawler/mockfetcher.go
index c94e1dd..becc3a6 100644
--- a/0-limit-crawler/mockfetcher.go
+++ b/0-limit-crawler/mockfetcher.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
diff --git a/1-producer-consumer/mockstream.go b/1-producer-consumer/mockstream.go
index 93b9da6..58c6791 100644
--- a/1-producer-consumer/mockstream.go
+++ b/1-producer-consumer/mockstream.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
diff --git a/2-race-in-cache/check_test.go b/2-race-in-cache/check_test.go
index 45a756a..7f21666 100644
--- a/2-race-in-cache/check_test.go
+++ b/2-race-in-cache/check_test.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
@@ -39,7 +39,7 @@ func TestLRU(t *testing.T) {
wg.Add(1)
go func(i int) {
value := cache.Get("Test" + strconv.Itoa(i))
- if value != "Test" + strconv.Itoa(i) {
+ if value != "Test"+strconv.Itoa(i) {
t.Errorf("Incorrect db response %v", value)
}
wg.Done()
diff --git a/2-race-in-cache/main.go b/2-race-in-cache/main.go
index 7618dd1..01fdbd0 100644
--- a/2-race-in-cache/main.go
+++ b/2-race-in-cache/main.go
@@ -1,65 +1,163 @@
-//////////////////////////////////////////////////////////////////////
-//
-// Given is some code to cache key-value pairs from a database into
-// the main memory (to reduce access time). Note that golang's map are
-// not entirely thread safe. Multiple readers are fine, but multiple
-// writers are not. Change the code to make this thread safe.
-//
-
package main
import (
"container/list"
+ "fmt"
+ "sync"
"testing"
)
-// CacheSize determines how big the cache can grow
const CacheSize = 100
-// KeyStoreCacheLoader is an interface for the KeyStoreCache
type KeyStoreCacheLoader interface {
// Load implements a function where the cache should gets it's content from
Load(string) string
}
+// page represents an item in our cache.
type page struct {
Key string
Value string
}
-// KeyStoreCache is a LRU cache for string key-value pairs
+// Future represents a pending or completed result for a key.
+// It allows multiple goroutines to wait for the result of a single load operation.
+type Future struct {
+ wg sync.WaitGroup // Used to wait for the load to complete
+ result *list.Element // Pointer to the list element when done
+ err error // Any error during loading
+ once sync.Once // Ensures load is called only once
+ loader func() (string, error) // The function to perform the actual load
+}
+
+func newFuture(loader func() (string, error)) *Future {
+ f := &Future{
+ loader: loader,
+ }
+ f.wg.Add(1) // Initialize wait group for 1 completion
+ return f
+}
+
+// Do performs the actual loading operation exactly once.
+func (f *Future) Do() {
+ f.once.Do(func() {
+ // Simulate a time-consuming load operation
+ val, err := f.loader()
+ if err != nil {
+ f.err = err
+ } else {
+ f.result = &list.Element{Value: &page{"", val}}
+ }
+ f.wg.Done() // Signal that loading is complete
+ })
+}
+
+// Wait blocks until the future's operation is complete and returns the result.
+func (f *Future) Wait() (*list.Element, error) {
+ f.wg.Wait()
+ return f.result, f.err
+}
+
+// SetResult sets the list.Element once the loading is done and added to the list.
+func (f *Future) SetResult(e *list.Element) {
+ f.result = e
+}
+
+// KeyStoreCache implements a concurrent LRU cache.
type KeyStoreCache struct {
- cache map[string]*list.Element
- pages list.List
- load func(string) string
+ mu sync.RWMutex // Guards access to cache and pages
+ cache map[string]*Future // Maps key to its Future (pending or completed)
+ pages *list.List // Doubly linked list for LRU eviction
+ load func(key string) string // The actual resource loading function
}
-// New creates a new KeyStoreCache
+// NewKeyStoreCache creates a new concurrent LRU cache.
func New(load KeyStoreCacheLoader) *KeyStoreCache {
return &KeyStoreCache{
+ cache: make(map[string]*Future),
+ pages: list.New(),
load: load.Load,
- cache: make(map[string]*list.Element),
}
}
-// Get gets the key from cache, loads it from the source if needed
+// Get retrieves a value from the cache, loading it if necessary.
func (k *KeyStoreCache) Get(key string) string {
- if e, ok := k.cache[key]; ok {
- k.pages.MoveToFront(e)
- return e.Value.(page).Value
+ // --- Phase 1: Check for existing entry (read-locked) ---
+ k.mu.RLock() // Acquire a read lock
+ f, ok := k.cache[key]
+ k.mu.RUnlock() // Release read lock quickly
+
+ if ok {
+ elem, err := f.Wait() // This blocks if the future is not yet done
+ if err != nil {
+ // Handle load error here if you want to propagate it
+ fmt.Printf("Error loading key '%s': %v\n", key, err)
+ return "" // Or re-attempt load, or return a specific error
+ }
+
+ k.mu.Lock()
+ k.pages.MoveToFront(elem)
+ k.mu.Unlock()
+
+ return elem.Value.(*page).Value
}
- // Miss - load from database and save it in cache
- p := page{key, k.load(key)}
- // if cache is full remove the least used item
- if len(k.cache) >= CacheSize {
- end := k.pages.Back()
- // remove from map
- delete(k.cache, end.Value.(page).Key)
- // remove from list
- k.pages.Remove(end)
+
+ k.mu.Lock()
+ f, ok = k.cache[key]
+ if ok {
+ // Another goroutine beat us to it. Release lock and wait for its result.
+ k.mu.Unlock()
+ elem, err := f.Wait()
+ if err != nil {
+ fmt.Printf("Error loading key '%s': %v\n", key, err)
+ return ""
+ }
+ k.mu.Lock() // Re-acquire lock to move to front
+ k.pages.MoveToFront(elem)
+ k.mu.Unlock()
+ return elem.Value.(*page).Value
}
- k.pages.PushFront(p)
- k.cache[key] = k.pages.Front()
+
+ // It's genuinely not in the cache. Create a new future.
+ newF := newFuture(func() (string, error) {
+ // The actual load operation that will be called by Do()
+ val := k.load(key)
+ return val, nil // Assuming k.load doesn't return an error, adjust if it does
+ })
+ k.cache[key] = newF
+ k.mu.Unlock() // Release the write lock *before* calling Do()
+
+ newF.Do() // This will call the loader function for this key exactly once.
+
+ // Now that loading is complete, acquire write lock again to update LRU and set result.
+ k.mu.Lock()
+ defer k.mu.Unlock() // Ensure lock is released
+
+ // Check for eviction before adding the new item
+ if k.pages.Len() >= CacheSize {
+ oldest := k.pages.Back()
+ if oldest != nil {
+ pToDelete := oldest.Value.(*page)
+ delete(k.cache, pToDelete.Key) // Remove from map
+ k.pages.Remove(oldest) // Remove from list
+ fmt.Printf("Evicting key: %s\n", pToDelete.Key)
+ }
+ }
+
+ // Get the loaded result from the future
+ loadedElem, err := newF.Wait() // This should return immediately now as Do() just completed.
+ if err != nil {
+ // Handle the error (e.g., remove from cache if load failed permanently)
+ delete(k.cache, key)
+ fmt.Printf("Final error after load for key '%s': %v\n", key, err)
+ return ""
+ }
+
+ // Add the new page to the front of the list and set its result in the future.
+ p := &page{key, loadedElem.Value.(*page).Value} // Re-create page to get its value
+ elem := k.pages.PushFront(p)
+ newF.SetResult(elem) // Set the actual list.Element in the future for future lookups
+
return p.Value
}
diff --git a/2-race-in-cache/mockdb.go b/2-race-in-cache/mockdb.go
index 8bd1a4c..38f724c 100644
--- a/2-race-in-cache/mockdb.go
+++ b/2-race-in-cache/mockdb.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
@@ -12,7 +12,7 @@ import (
)
// MockDB used to simulate a database model
-type MockDB struct{
+type MockDB struct {
Calls int32
}
diff --git a/2-race-in-cache/mockserver.go b/2-race-in-cache/mockserver.go
index a60fab2..1a432c2 100644
--- a/2-race-in-cache/mockserver.go
+++ b/2-race-in-cache/mockserver.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
@@ -31,7 +31,7 @@ func RunMockServer(cache *KeyStoreCache, t *testing.T) {
go func(i int) {
value := cache.Get("Test" + strconv.Itoa(i))
if t != nil {
- if value != "Test" + strconv.Itoa(i) {
+ if value != "Test"+strconv.Itoa(i) {
t.Errorf("Incorrect db response %v", value)
}
}
diff --git a/3-limit-service-time/mockserver.go b/3-limit-service-time/mockserver.go
index f435c9d..211a6da 100644
--- a/3-limit-service-time/mockserver.go
+++ b/3-limit-service-time/mockserver.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
diff --git a/4-graceful-sigint/mockprocess.go b/4-graceful-sigint/mockprocess.go
index fdf5ecd..c040cf9 100644
--- a/4-graceful-sigint/mockprocess.go
+++ b/4-graceful-sigint/mockprocess.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
@@ -15,7 +15,7 @@ import (
// MockProcess for example
type MockProcess struct {
- mu sync.Mutex
+ mu sync.Mutex
isRunning bool
}
diff --git a/5-session-cleaner/helper.go b/5-session-cleaner/helper.go
index 74f93f3..6e8cc5f 100644
--- a/5-session-cleaner/helper.go
+++ b/5-session-cleaner/helper.go
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////
//
// DO NOT EDIT THIS PART
-// Your task is to edit `main.go`
+// Your task is to edit `base.go`
//
package main
diff --git a/6-cancellation/cancellation/README.md b/6-cancellation/cancellation/README.md
new file mode 100644
index 0000000..c1469a4
--- /dev/null
+++ b/6-cancellation/cancellation/README.md
@@ -0,0 +1,7 @@
+# Exercise requirements
+
+You are implementing a DB instance which is used to query data at external DB through a driver called `EmulatedDriver`.
+Your task is to implement `QueryContext`, which must ensure:
+1. When the context is timed out or get cancelled, you must return as soon as possible.
+2. Before return, ensuring all the resource of the operation is clean up.
+3. The operation must return errors if a failure happens.
\ No newline at end of file
diff --git a/6-cancellation/cancellation/go.mod b/6-cancellation/cancellation/go.mod
new file mode 100644
index 0000000..0e1e79a
--- /dev/null
+++ b/6-cancellation/cancellation/go.mod
@@ -0,0 +1,3 @@
+module go_concurrency/cancellation
+
+go 1.24.2
diff --git a/6-cancellation/cancellation/imp/db.go b/6-cancellation/cancellation/imp/db.go
new file mode 100644
index 0000000..d797e21
--- /dev/null
+++ b/6-cancellation/cancellation/imp/db.go
@@ -0,0 +1,74 @@
+package imp
+
+import (
+ "context"
+ "log"
+)
+
+// -----------------------------------------------------------------------------
+// Your db instance (that uses the EmulatedDriver)
+// -----------------------------------------------------------------------------
+
+// YourDB is your custom database instance that would use the EmulatedDriver.
+type YourDB struct {
+ driver EmulatedDriver
+}
+
+// NewYourDB creates a new instance of YourDB with the provided driver.
+func NewYourDB(driver EmulatedDriver) *YourDB {
+ return &YourDB{driver: driver}
+}
+
+// QueryContext is your implementation of the database query method that
+// supports context cancellation.
+func (db *YourDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*simulatedRows, error) {
+ queryOperation, err := db.driver.PrepareQuery(ctx, query, args)
+ if err != nil {
+ return nil, err
+ }
+
+ resChannel := make(chan *simulatedRows)
+ errChannel := make(chan error)
+ finished := make(chan struct{})
+ go func() {
+ defer close(finished)
+ res, err := queryOperation.Wait()
+ select {
+ case <-ctx.Done():
+ log.Printf("Sub-goroutine for '%s': Context canceled. Not sending result/error.", query)
+ return // Exit the goroutine cleanly
+ default:
+ if err != nil {
+ errChannel <- err
+ } else {
+ resChannel <- res
+ }
+ }
+ }()
+
+ var res *simulatedRows
+ select {
+ case <-ctx.Done():
+ {
+ close(resChannel)
+ close(errChannel)
+ err := queryOperation.Cancel()
+ if err != nil {
+ return nil, err
+ }
+
+ <-finished
+ return nil, ctx.Err()
+ }
+ case res = <-resChannel:
+ {
+ return res, nil
+ }
+ case err := <-errChannel:
+ {
+ return nil, err
+ }
+ }
+
+ return nil, nil
+}
diff --git a/6-cancellation/cancellation/imp/driver.go b/6-cancellation/cancellation/imp/driver.go
new file mode 100644
index 0000000..44978b8
--- /dev/null
+++ b/6-cancellation/cancellation/imp/driver.go
@@ -0,0 +1,41 @@
+package imp
+
+import (
+ "context"
+ "log"
+)
+
+// EmulatedDriver is the interface that your 'db' instance would use to interact with
+// the underlying database driver.
+type EmulatedDriver interface {
+ // PrepareQuery initiates a query and returns a handle to the ongoing operation.
+ // It does NOT block until the query completes.
+ PrepareQuery(ctx context.Context, query string, args ...interface{}) (QueryOperation, error)
+}
+
+// -----------------------------------------------------------------------------
+// Mock Implementation of the EmulatedDriver and QueryOperation
+// -----------------------------------------------------------------------------
+
+// mockEmulatedDriver is a concrete implementation of EmulatedDriver for testing.
+type mockEmulatedDriver struct {
+ // You might add a connection pool or other driver-level state here
+}
+
+// NewMockEmulatedDriver creates a new instance of the mock driver.
+func NewMockEmulatedDriver() EmulatedDriver {
+ return &mockEmulatedDriver{}
+}
+
+// PrepareQuery simulates preparing and starting a database query.
+func (m *mockEmulatedDriver) PrepareQuery(ctx context.Context, query string, args ...interface{}) (QueryOperation, error) {
+ log.Printf("Mock Driver: Preparing and starting query: '%s'", query)
+ op := &mockQueryOperation{
+ query: query,
+ finished: make(chan struct{}),
+ cancelSignal: make(chan struct{}, 1), // Buffered channel for non-blocking sends
+ }
+
+ go op.run(ctx) // Start the "query" in a goroutine
+ return op, nil
+}
diff --git a/6-cancellation/cancellation/imp/query_operation.go b/6-cancellation/cancellation/imp/query_operation.go
new file mode 100644
index 0000000..b310678
--- /dev/null
+++ b/6-cancellation/cancellation/imp/query_operation.go
@@ -0,0 +1,102 @@
+package imp
+
+import (
+ "context"
+ "errors"
+ "log"
+ "sync"
+ "time"
+)
+
+// QueryOperation represents an ongoing database query.
+// It allows for waiting on the query's completion and explicitly canceling it.
+type QueryOperation interface {
+ // Wait blocks until the query completes successfully or with an error.
+ // It returns the results (e.g., *simulatedRows) and any error.
+ Wait() (*simulatedRows, error)
+
+ // Cancel attempts to interrupt the ongoing query.
+ // This method should be safe to call multiple times or if the query has already finished.
+ Cancel() error
+}
+
+// mockQueryOperation is a concrete implementation of QueryOperation for testing.
+type mockQueryOperation struct {
+ query string
+ result *simulatedRows
+ opErr error
+ finished chan struct{} // Closed when the operation completes (successfully or with error)
+ cancelSignal chan struct{} // Used to signal cancellation to the running operation goroutine
+ mu sync.Mutex // Protects access to result and opErr
+ canceled bool
+}
+
+// run simulates the actual database query execution.
+func (op *mockQueryOperation) run(ctx context.Context) {
+ defer close(op.finished) // Ensure 'finished' is always closed
+
+ // Simulate query execution time
+ queryDuration := 3 * time.Second // Default query duration
+ if op.query == "FAST QUERY" {
+ queryDuration = 500 * time.Millisecond // Iteration faster query
+ }
+
+ log.Printf("Mock QueryOperation: Starting execution for '%s' (will take %v)", op.query, queryDuration)
+
+ select {
+ case <-time.After(queryDuration):
+ // Query completed successfully
+ op.mu.Lock()
+ op.result = &simulatedRows{data: []string{"data_for_" + op.query}}
+ op.opErr = nil
+ op.mu.Unlock()
+ log.Printf("Mock QueryOperation: Query '%s' completed successfully.", op.query)
+ case <-op.cancelSignal:
+ // Cancellation requested by the caller
+ op.mu.Lock()
+ op.opErr = context.Canceled // Or a custom driver-specific cancellation error
+ op.canceled = true
+ op.mu.Unlock()
+ log.Printf("Mock QueryOperation: Query '%s' was explicitly canceled by the caller.", op.query)
+ case <-ctx.Done():
+ // Context itself was canceled (e.g., timeout, parent context cancel)
+ op.mu.Lock()
+ op.opErr = ctx.Err() // This will be context.Canceled or context.DeadlineExceeded
+ op.canceled = true
+ op.mu.Unlock()
+ log.Printf("Mock QueryOperation: Query '%s' interrupted due to context cancellation: %v", op.query, ctx.Err())
+ }
+}
+
+// Wait blocks until the query completes.
+func (op *mockQueryOperation) Wait() (*simulatedRows, error) {
+ <-op.finished // Wait for the operation to complete
+ op.mu.Lock()
+ defer op.mu.Unlock()
+ return op.result, op.opErr
+}
+
+// Cancel attempts to interrupt the ongoing query by sending a signal.
+func (op *mockQueryOperation) Cancel() error {
+ op.mu.Lock()
+ if op.canceled { // Already canceled or finished by context
+ op.mu.Unlock()
+ log.Printf("Mock QueryOperation: Attempted to cancel '%s' but it was already cancelled/finished.", op.query)
+ return nil // Or return a specific error if you want to differentiate
+ }
+ op.mu.Unlock()
+
+ select {
+ case op.cancelSignal <- struct{}{}: // Send a cancellation signal
+ log.Printf("Mock QueryOperation: Sent explicit cancel signal for '%s'.", op.query)
+ return nil
+ case <-op.finished:
+ // Operation already finished before we could send the cancel signal
+ log.Printf("Mock QueryOperation: Attempted to cancel '%s' but it already finished.", op.query)
+ return nil
+ default:
+ // Should not happen if the buffer is 1 and handled correctly
+ log.Printf("Mock QueryOperation: Failed to send cancel signal for '%s'. Channel blocked or already sent.", op.query)
+ return errors.New("failed to send cancel signal")
+ }
+}
diff --git a/6-cancellation/cancellation/imp/rows.go b/6-cancellation/cancellation/imp/rows.go
new file mode 100644
index 0000000..37a5aac
--- /dev/null
+++ b/6-cancellation/cancellation/imp/rows.go
@@ -0,0 +1,45 @@
+package imp
+
+import (
+ "errors"
+ "log"
+ "sync"
+)
+
+// simulatedRows represents a simplified result set
+type simulatedRows struct {
+ data []string
+ idx int
+ mu sync.Mutex // Protects access to data/idx
+}
+
+func (sr *simulatedRows) Next() bool {
+ sr.mu.Lock()
+ defer sr.mu.Unlock()
+ if sr.idx < len(sr.data) {
+ sr.idx++
+ return true
+ }
+ return false
+}
+
+func (sr *simulatedRows) Scan(dest ...interface{}) error {
+ sr.mu.Lock()
+ defer sr.mu.Unlock()
+ if sr.idx-1 < len(sr.data) {
+ if len(dest) != 1 {
+ return errors.New("simulatedRows.Scan expects 1 destination")
+ }
+ if s, ok := dest[0].(*string); ok {
+ *s = sr.data[sr.idx-1]
+ return nil
+ }
+ return errors.New("simulatedRows.Scan: unsupported destination type")
+ }
+ return errors.New("simulatedRows.Scan: no more rows")
+}
+
+func (sr *simulatedRows) Close() error {
+ log.Println("SimulatedRows: Closed.")
+ return nil
+}
diff --git a/6-cancellation/cancellation/main.go b/6-cancellation/cancellation/main.go
new file mode 100644
index 0000000..5283ba2
--- /dev/null
+++ b/6-cancellation/cancellation/main.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "go_concurrency/cancellation/imp"
+ "time"
+)
+
+func main() {
+ // Initialize your DB with the mock driver
+ db := imp.NewYourDB(imp.NewMockEmulatedDriver())
+
+ // --- Test Case 1: Timeout (Query takes longer than context) ---
+ fmt.Println("\n--- Test Case 1: Timeout ---")
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel1()
+
+ start := time.Now()
+ rows1, err1 := db.QueryContext(ctx1, "SLOW QUERY")
+ duration := time.Since(start)
+ fmt.Printf("Query 1 completed in %v\n", duration)
+
+ if err1 != nil {
+ if errors.Is(err1, context.DeadlineExceeded) {
+ fmt.Printf("Query 1 result: Expected error (Context deadline exceeded).\n")
+ } else {
+ fmt.Printf("Query 1 result: Unexpected error: %v\n", err1)
+ }
+ } else {
+ defer rows1.Close()
+ var data string
+ for rows1.Next() {
+ rows1.Scan(&data)
+ fmt.Printf("Data: %s\n", data)
+ }
+ fmt.Println("Query 1 result: Succeeded (unexpected).")
+ }
+
+ // --- Test Case 2: Explicit Cancellation (Query is canceled before completion) ---
+ fmt.Println("\n--- Test Case 2: Explicit Cancellation ---")
+ ctx2, cancel2 := context.WithCancel(context.Background())
+
+ go func() {
+ time.Sleep(1 * time.Second) // Cancel after 1 second
+ fmt.Println("Main: Calling cancel2() for Query 2.")
+ cancel2()
+ }()
+
+ start = time.Now()
+ rows2, err2 := db.QueryContext(ctx2, "ANOTHER SLOW QUERY")
+ duration = time.Since(start)
+ fmt.Printf("Query 2 completed in %v\n", duration)
+
+ if err2 != nil {
+ if errors.Is(err2, context.Canceled) {
+ fmt.Printf("Query 2 result: Expected error (Context canceled).\n")
+ } else {
+ fmt.Printf("Query 2 result: Unexpected error: %v\n", err2)
+ }
+ } else {
+ defer rows2.Close()
+ var data string
+ for rows2.Next() {
+ rows2.Scan(&data)
+ fmt.Printf("Data: %s\n", data)
+ }
+ fmt.Println("Query 2 result: Succeeded (unexpected).")
+ }
+
+ // --- Test Case 3: Query Completes Successfully (within context) ---
+ fmt.Println("\n--- Test Case 3: Success ---")
+ ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel3()
+
+ start = time.Now()
+ rows3, err3 := db.QueryContext(ctx3, "FAST QUERY") // This query is designed to be faster
+ duration = time.Since(start)
+ fmt.Printf("Query 3 completed in %v\n", duration)
+
+ if err3 != nil {
+ fmt.Printf("Query 3 result: Error: %v\n", err3)
+ } else {
+ defer rows3.Close()
+ var data string
+ found := false
+ for rows3.Next() {
+ rows3.Scan(&data)
+ fmt.Printf("Query 3 Data: %s\n", data)
+ found = true
+ }
+ if !found {
+ fmt.Println("Query 3 result: No rows found.")
+ }
+ fmt.Println("Query 3 result: Succeeded (expected).")
+ }
+
+ // Give time for logs to print
+ time.Sleep(100 * time.Millisecond)
+}
diff --git a/7-lfu-cache/cache_test.go b/7-lfu-cache/cache_test.go
new file mode 100644
index 0000000..5bf5c78
--- /dev/null
+++ b/7-lfu-cache/cache_test.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "os"
+ "slices"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestCache(t *testing.T) {
+ cache, err := NewLFUCache(3, func(key string) (string, error) {
+ return "", nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cache.Set("vu", "10")
+ err = cache.Set("nghia", "20")
+ err = cache.Set("luan", "5")
+
+ value, err := cache.Get("vu")
+ if value != "10" {
+ t.Errorf("value should be 10, got %s", value)
+ }
+
+ value, err = cache.Get("nghia")
+ if value != "20" {
+ t.Errorf("value should be 20, got %s", value)
+ }
+
+ err = cache.Set("xanh", "30")
+
+ keys := cache.GetKeys()
+ if slices.Contains(keys, "luan") {
+ t.Errorf("keys should not contain luan")
+ }
+}
+
+func TestCache1(t *testing.T) {
+ cache, err := NewLFUCache(3, func(key string) (string, error) {
+ return "", nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cache.Set("vu", "10")
+ err = cache.Set("nghia", "20")
+ err = cache.Set("luan", "5")
+
+ for i := 0; i < 10; i++ {
+ cache.Get("vu")
+ }
+
+ for i := 0; i < 9; i++ {
+ cache.Get("nghia")
+ }
+
+ for i := 0; i < 8; i++ {
+ cache.Get("luan")
+ }
+
+ i := 8
+ for e := cache.GetBuckets().Front(); e != nil; e = e.Next() {
+ fmt.Printf("Value: %v (Type: %T)\n", e.Value, e.Value)
+ bucketFreq := cache.GetFreq(e)
+ if bucketFreq != i {
+ t.Errorf("bucketFreq should be %d, got %d", i, bucketFreq)
+ }
+ i += 1
+ }
+
+ err = cache.Set("xanh", "30")
+ keys := cache.GetKeys()
+ if slices.Contains(keys, "luan") {
+ t.Errorf("keys should not contain luan")
+ }
+}
+
+const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+
+func generaterandomstringMathrand(length int) string {
+ if length <= 0 {
+ return ""
+ }
+
+ // Use strings.Builder for efficient string concatenation.
+ // It pre-allocates memory, avoiding multiple re-allocations.
+ var sb strings.Builder
+ sb.Grow(length) // Pre-allocate capacity for efficiency
+
+ charsetLen := len(charset)
+ for i := 0; i < length; i++ {
+ // Pick a random index from the charset
+ randomIndex := rand.Intn(charsetLen)
+ // Append the character at that index
+ sb.WriteByte(charset[randomIndex])
+ }
+
+ return sb.String()
+}
+
+// --- Test Main for Global Setup ---
+func TestMain(m *testing.M) {
+ // Seed the global random number generator once for all tests in this package.
+ // This is CRUCIAL for reproducible random behavior across test runs.
+ rand.New(rand.NewSource(time.Now().UnixNano()))
+
+ // Run all tests
+ code := m.Run()
+
+ // Exit with the test result code
+ os.Exit(code)
+}
+
+func TestCacheConcurrency(t *testing.T) {
+ cache, _ := NewLFUCache(5, func(key string) (string, error) {
+ return "", errors.New("Loader hasn't been implemented yet")
+ })
+
+ keyValueMap := []string{"vu", "nghia", "luan", "xanh", "orange", "thuong",
+ "tien", "lemon", "durian", "rambutant", "pear", "mango", "apple"}
+
+ var wg sync.WaitGroup
+ maxSetOperations := 10000
+ maxGetOperations := 5000
+ // Setter
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < maxSetOperations; i++ {
+ randomNumber := rand.Intn(len(keyValueMap)) + 0
+ cache.Set(keyValueMap[randomNumber], generaterandomstringMathrand(5))
+ }
+ }()
+ }
+
+ // 5 getters
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for j := 0; j < maxGetOperations; j++ {
+ randomNumber := rand.Intn(len(keyValueMap)) + 0
+ cache.Get(keyValueMap[randomNumber])
+ }
+ }()
+ }
+
+ wg.Wait()
+}
diff --git a/7-lfu-cache/go.mod b/7-lfu-cache/go.mod
new file mode 100644
index 0000000..cb2b8b6
--- /dev/null
+++ b/7-lfu-cache/go.mod
@@ -0,0 +1,3 @@
+module lfu_cache
+
+go 1.24.2
diff --git a/7-lfu-cache/load_group.go b/7-lfu-cache/load_group.go
new file mode 100644
index 0000000..a7f7d66
--- /dev/null
+++ b/7-lfu-cache/load_group.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+ "container/list"
+ "sync"
+)
+
+type LoadGroup struct {
+ calls map[string]*call
+ mutex sync.Mutex
+ cache *Cache
+}
+
+type call struct {
+ mu sync.RWMutex
+ result *string
+ err *error
+}
+
+// Get ensures one loading task is run even if multiple threads are waiting on the same key
+// /*
+func (l *LoadGroup) Get(key string, loaderFunc LoaderFunc) (string, error) {
+ l.mutex.Lock()
+ cache := *(l.cache)
+ vc, err := cache.GetWithoutLoad(key)
+ if err != nil {
+ //
+ }
+
+ if len(vc) != 0 {
+ l.mutex.Unlock()
+ return vc, nil
+ }
+
+ if call, ok := l.calls[key]; ok {
+ l.mutex.Unlock()
+
+ call.mu.RLock()
+ result := call.result
+ call.mu.RUnlock()
+ return *result, nil
+ }
+
+ call := &call{
+ result: new(string),
+ }
+
+ l.calls[key] = call
+ call.mu.Lock()
+ l.mutex.Unlock()
+
+ // TODO: handling panic
+ v, err := loaderFunc(key)
+ if err != nil {
+
+ }
+ call.result = &v
+ call.mu.Unlock()
+
+ // Remove call and update cache
+ l.mutex.Lock()
+ err = cache.Set(key, v)
+ if err != nil {
+ // TODO: handling error
+ l.mutex.Unlock()
+ return "", err
+ }
+
+ delete(l.calls, key)
+ l.mutex.Unlock()
+}
diff --git a/7-lfu-cache/main.go b/7-lfu-cache/main.go
new file mode 100644
index 0000000..61924bf
--- /dev/null
+++ b/7-lfu-cache/main.go
@@ -0,0 +1,223 @@
+package main
+
+import (
+ "container/list"
+ "errors"
+ "sync"
+)
+
+var EMPTY_ERROR = errors.New("EMPTY ERROR")
+
+type Cache interface {
+ Get(key string) (string, error)
+ GetWithoutLoad(key string) (string, error)
+ Set(key, value string) error
+}
+
+type (
+ LoaderFunc func(key string) (string, error)
+)
+
+type baseCache struct {
+ mu sync.Mutex
+ size int
+ loaderFunc LoaderFunc
+ loadGroup LoadGroup
+}
+
+type LFUCache struct {
+ baseCache
+ cache map[string]*lfuItem
+ list *list.List // list of lruEntry
+}
+
+type lruEntry struct {
+ freq int
+ items map[*lfuItem]struct{}
+}
+
+type lfuItem struct {
+ value string
+ key string
+ el *list.Element // Reference to lruEntry
+}
+
+func NewLFUCache(size int, loaderFunc LoaderFunc) (*LFUCache, error) {
+ if size <= 0 {
+ return nil, errors.New("size must be greater than zero")
+ }
+
+ cache := &LFUCache{
+ cache: make(map[string]*lfuItem),
+ list: list.New(),
+ }
+
+ cache.baseCache.size = size
+ cache.baseCache.loaderFunc = loaderFunc
+ cache.baseCache.loadGroup = LoadGroup{}
+
+ return cache, nil
+}
+
+// For testing
+func (cache *LFUCache) GetBuckets() *list.List {
+ return cache.list
+}
+
+func (cache *LFUCache) GetFreq(buckets *list.Element) int {
+ return buckets.Value.(*lruEntry).freq
+}
+
+func (cache *LFUCache) Get(key string) (string, error) {
+ cache.mu.Lock()
+ if item, ok := cache.cache[key]; ok {
+ // Move item to the higher bucket
+ v := item.value
+ err := cache.moveToHigherBucket(item)
+ if err != nil {
+ return "", err
+ }
+ cache.mu.Unlock()
+
+ return v, nil
+ }
+ cache.mu.Unlock()
+
+ // Miss, so load value
+ value, err := cache.loaderFunc(key)
+ if err != nil {
+ return "", err
+ }
+
+ err = cache.Set(key, value)
+ if err != nil {
+ return "", err
+ }
+
+ return value, nil
+}
+
+func load(key string) (string, error) {
+
+}
+
+func (cache *LFUCache) GetKeys() []string {
+ keys := make([]string, 0)
+ for k, _ := range cache.cache {
+ keys = append(keys, k)
+ }
+
+ return keys
+}
+
+func (cache *LFUCache) Set(key, value string) error {
+ cache.mu.Lock()
+ defer cache.mu.Unlock()
+
+ if item, ok := cache.cache[key]; ok {
+ item.value = value
+ return nil
+ }
+
+ if len(cache.cache) >= cache.size {
+ err := cache.evict()
+ if err != nil && !errors.Is(err, EMPTY_ERROR) {
+ return err
+ }
+ }
+
+ cache.insert(key, value)
+ return nil
+}
+
+// insert inserts key, value knowing that there is always slot for it
+func (cache *LFUCache) insert(key, value string) {
+ insertedItem := &lfuItem{
+ value: value,
+ key: key,
+ }
+
+ cache.cache[key] = insertedItem
+
+ var firstEntry *lruEntry
+ var firstElement *list.Element
+ if cache.list.Front() == nil || cache.list.Front().Value.(*lruEntry).freq != 0 {
+ firstEntry = &lruEntry{
+ freq: 0,
+ items: make(map[*lfuItem]struct{}),
+ }
+
+ firstElement = cache.list.PushFront(firstEntry)
+ } else {
+ firstElement = cache.list.Front()
+ firstEntry = firstElement.Value.(*lruEntry)
+ }
+
+ firstEntry.items[insertedItem] = struct{}{}
+ insertedItem.el = firstElement
+}
+
+func getItemToEvict(mapp map[*lfuItem]struct{}) (*lfuItem, error) {
+ for key, _ := range mapp {
+ return key, nil
+ }
+
+ return nil, EMPTY_ERROR
+}
+
+func (cache *LFUCache) evict() error {
+ zeroBucket := cache.list.Front()
+ if zeroBucket == nil {
+ return EMPTY_ERROR
+ }
+
+ items := zeroBucket.Value.(*lruEntry).items
+ itemToRemove, err := getItemToEvict(items)
+ if err != nil {
+ return err
+ }
+
+ delete(items, itemToRemove)
+ if len(items) == 0 {
+ cache.list.Remove(zeroBucket)
+ }
+
+ delete(cache.cache, itemToRemove.key)
+
+ return nil
+}
+
+func (cache *LFUCache) moveToHigherBucket(item *lfuItem) error {
+ if item == nil {
+ return errors.New("item is nil")
+ }
+
+ curBucket := item.el
+ curBucketEntry := curBucket.Value.(*lruEntry)
+ nextFreq := curBucketEntry.freq + 1
+ delete(curBucketEntry.items, item)
+
+ var nextBucket *list.Element
+ if item.el.Next() == nil || item.el.Next().Value.(*lruEntry).freq > nextFreq {
+ nextBucketEntry := &lruEntry{
+ freq: nextFreq,
+ items: make(map[*lfuItem]struct{}),
+ }
+
+ nextBucketEntry.items[item] = struct{}{}
+ nextBucket = cache.list.InsertAfter(nextBucketEntry, item.el)
+ } else {
+ nextBucket = item.el.Next()
+ nextBucketEntry := nextBucket.Value.(*lruEntry)
+ nextBucketEntry.items[item] = struct{}{}
+ }
+
+ item.el = nextBucket
+
+ // Remove last bucket in case it is empty
+ if len(curBucketEntry.items) == 0 {
+ cache.list.Remove(curBucket)
+ }
+
+ return nil
+}
diff --git a/go.mod b/go.mod
index 8b894ce..12abc80 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,10 @@
-module github.com/loong/go-concurrency-exercises
+module github.com/nathanverse/go-concurrency-exercises
-go 1.19
+go 1.24.2
+
+require github.com/pkg/profile v1.7.0
+
+require (
+ github.com/felixge/fgprof v0.9.3 // indirect
+ github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
+)
diff --git a/go.sum b/go.sum
index e69de29..6e67449 100644
--- a/go.sum
+++ b/go.sum
@@ -0,0 +1,25 @@
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
+github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
+github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y=
+github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
+github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
+github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
+github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..9b3358d
--- /dev/null
+++ b/main.go
@@ -0,0 +1,7 @@
+package main
+
+import "fmt"
+
+func main() {
+ fmt.Println(subsets([]int{1, 2, 3}))
+}
diff --git a/playground/go.mod b/playground/go.mod
new file mode 100644
index 0000000..0b01687
--- /dev/null
+++ b/playground/go.mod
@@ -0,0 +1,5 @@
+module go/playground
+
+go 1.24.2
+
+require golang.org/x/sync v0.15.0
diff --git a/playground/go.sum b/playground/go.sum
new file mode 100644
index 0000000..f98f706
--- /dev/null
+++ b/playground/go.sum
@@ -0,0 +1,2 @@
+golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
+golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
diff --git a/playground/main.go b/playground/main.go
new file mode 100644
index 0000000..7905807
--- /dev/null
+++ b/playground/main.go
@@ -0,0 +1,5 @@
+package main
+
+func main() {
+
+}
diff --git a/queue/Makefile b/queue/Makefile
new file mode 100644
index 0000000..8aca680
--- /dev/null
+++ b/queue/Makefile
@@ -0,0 +1,19 @@
+.PHONY: server client
+
+ADDR ?= :8080
+CAPACITY ?= 200
+WORKERS ?= 8
+TOTAL ?= 100
+CONCURRENCY ?= 4
+ITERATIONS ?= 3
+PPROF_PORT ?= 8081
+PPROF_FILE ?= mem.pprof
+
+server:
+ go run main.go -mode=server -addr=$(ADDR) -capacity=$(CAPACITY) -workers=$(WORKERS)
+
+client:
+ go run main.go -mode=client -addr=$(ADDR) -total=$(TOTAL) -concurrency=$(CONCURRENCY) -iterations=$(ITERATIONS)
+
+profmem:
+ go tool pprof -http=:$(PPROF_PORT) $(PPROF_FILE)
diff --git a/queue/bench/cpu_bound/bench_16.txt b/queue/bench/cpu_bound/bench_16.txt
new file mode 100644
index 0000000..ec96a13
--- /dev/null
+++ b/queue/bench/cpu_bound/bench_16.txt
@@ -0,0 +1,9 @@
+goos: darwin
+goarch: arm64
+pkg: vu/benchmark/queue
+cpu: Apple M2
+BenchmarkQueueHashFixedIterations-8 4 255125438 ns/op
+BenchmarkQueueHashFixedIterations-8 4 262671688 ns/op
+BenchmarkQueueHashFixedIterations-8 4 254897771 ns/op
+PASS
+ok vu/benchmark/queue 23.770s
diff --git a/queue/bench/cpu_bound/bench_32.txt b/queue/bench/cpu_bound/bench_32.txt
new file mode 100644
index 0000000..bfb704b
--- /dev/null
+++ b/queue/bench/cpu_bound/bench_32.txt
@@ -0,0 +1,9 @@
+goos: darwin
+goarch: arm64
+pkg: vu/benchmark/queue
+cpu: Apple M2
+BenchmarkQueueHashFixedIterations-8 4 253688323 ns/op
+BenchmarkQueueHashFixedIterations-8 4 254959531 ns/op
+BenchmarkQueueHashFixedIterations-8 4 254726896 ns/op
+PASS
+ok vu/benchmark/queue 23.966s
diff --git a/queue/bench/cpu_bound/bench_64.txt b/queue/bench/cpu_bound/bench_64.txt
new file mode 100644
index 0000000..94ac2a7
--- /dev/null
+++ b/queue/bench/cpu_bound/bench_64.txt
@@ -0,0 +1,9 @@
+goos: darwin
+goarch: arm64
+pkg: vu/benchmark/queue
+cpu: Apple M2
+BenchmarkQueueHashFixedIterations-8 4 254658396 ns/op
+BenchmarkQueueHashFixedIterations-8 4 254396916 ns/op
+BenchmarkQueueHashFixedIterations-8 4 265479604 ns/op
+PASS
+ok vu/benchmark/queue 24.486s
diff --git a/queue/bench/cpu_bound/bench_8.txt b/queue/bench/cpu_bound/bench_8.txt
new file mode 100644
index 0000000..dbbd5e4
--- /dev/null
+++ b/queue/bench/cpu_bound/bench_8.txt
@@ -0,0 +1,15 @@
+2025/12/15 15:13:16 profile: cpu profiling enabled, cpu.pprof
+2025/12/15 15:13:18 profile: cpu profiling disabled, cpu.pprof
+goos: darwin
+goarch: arm64
+pkg: vu/benchmark/queue
+cpu: Apple M2
+BenchmarkQueueHashFixedIterations-8 2025/12/15 15:13:18 profile: cpu profiling enabled, cpu.pprof
+2025/12/15 15:13:20 profile: cpu profiling disabled, cpu.pprof
+2025/12/15 15:13:20 profile: cpu profiling enabled, cpu.pprof
+2025/12/15 15:13:22 profile: cpu profiling disabled, cpu.pprof
+2025/12/15 15:13:22 profile: cpu profiling enabled, cpu.pprof
+2025/12/15 15:13:24 profile: cpu profiling disabled, cpu.pprof
+ 4 253199010 ns/op
+PASS
+ok vu/benchmark/queue 8.583s
diff --git a/queue/bench/cpu_bound/bench_numCPU.txt b/queue/bench/cpu_bound/bench_numCPU.txt
new file mode 100644
index 0000000..8947c96
--- /dev/null
+++ b/queue/bench/cpu_bound/bench_numCPU.txt
@@ -0,0 +1,9 @@
+goos: darwin
+goarch: arm64
+pkg: vu/benchmark/queue
+cpu: Apple M2
+BenchmarkQueueHashFixedIterations-8 4 265989531 ns/op
+BenchmarkQueueHashFixedIterations-8 4 254028333 ns/op
+BenchmarkQueueHashFixedIterations-8 4 255124583 ns/op
+PASS
+ok vu/benchmark/queue 23.775s
diff --git a/queue/bench/cpu_bound/benchmark_test.go b/queue/bench/cpu_bound/benchmark_test.go
new file mode 100644
index 0000000..6852620
--- /dev/null
+++ b/queue/bench/cpu_bound/benchmark_test.go
@@ -0,0 +1,121 @@
+package cpu_bound
+
+import (
+ "encoding/json"
+ "github.com/pkg/profile"
+ "strconv"
+ "testing"
+ "time"
+
+ "vu/benchmark/queue/internal"
+ "vu/benchmark/queue/tasks"
+)
+
+func TestBench(t *testing.T) {
+ defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
+ const iterations = 3_000_000_000
+
+ poolSize := 8
+ capacity := 1_000
+
+ payload, err := json.Marshal(tasks.BurnCPUTaskInput{Iteration: iterations})
+ if err != nil {
+ t.Fatalf("marshal hash input: %v", err)
+ }
+
+ queue := internal.NewQueue(capacity, poolSize, true)
+ pending := make(chan (<-chan internal.Output), capacity)
+ for i := 0; i < 5; i++ {
+ go func() {
+ for ch := range pending {
+ out := <-ch
+
+ if out.Err != nil {
+ t.Logf("task failed: %v", out.Err)
+ } else {
+ //t.Logf("task ok: %s", out.Res)
+ }
+ }
+ }()
+ }
+
+ i := 0
+ for i < 100 {
+ task := tasks.Task{
+ Id: strconv.Itoa(i),
+ Type: tasks.BurnCPUTaskType,
+ Input: payload,
+ }
+
+ ch, err := queue.Put(&task)
+ if err != nil {
+ //t.Logf("Error when put task %d: %v", i, err)
+ time.Sleep(2 * time.Second)
+ continue
+ }
+
+ //t.Logf("put task %d", i)
+ pending <- ch
+ i++
+ }
+
+ queue.Shutdown()
+ close(pending)
+ time.Sleep(1 * time.Second)
+}
+
+// Benchmark queue throughput for the hash task using a fixed iteration count.
+func BenchmarkQueueHashFixedIterations(b *testing.B) {
+ defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
+ const iterations = 3_000_000_000
+
+ poolSize := 8
+ capacity := 1_000
+
+ payload, err := json.Marshal(tasks.BurnCPUTaskInput{Iteration: iterations})
+ if err != nil {
+ b.Fatalf("marshal hash input: %v", err)
+ }
+
+ queue := internal.NewQueue(capacity, poolSize, true)
+ pending := make(chan (<-chan internal.Output), capacity)
+ for i := 0; i < 5; i++ {
+ go func() {
+ for ch := range pending {
+ out := <-ch
+
+ if out.Err != nil {
+ b.Logf("task failed: %v", out.Err)
+ } else {
+ //b.Logf("task ok: %s", out.Res)
+ }
+ }
+ }()
+ }
+
+ i := 0
+ b.ResetTimer()
+ for i < b.N {
+ task := tasks.Task{
+ Id: strconv.Itoa(i),
+ Type: tasks.BurnCPUTaskType,
+ Input: payload,
+ }
+
+ ch, err := queue.Put(&task)
+ if err != nil {
+ //b.Logf("Error when put task %d: %v", i, err)
+ time.Sleep(2 * time.Second)
+ continue
+ }
+
+ //b.Logf("put task %d", i)
+ pending <- ch
+ i++
+ }
+
+ queue.Shutdown()
+ b.StopTimer()
+ close(pending)
+ time.Sleep(1 * time.Second)
+}
diff --git a/queue/bench/cpu_bound/cpu.pprof b/queue/bench/cpu_bound/cpu.pprof
new file mode 100644
index 0000000..74de8f6
Binary files /dev/null and b/queue/bench/cpu_bound/cpu.pprof differ
diff --git a/queue/cmd/bench/main.go b/queue/cmd/bench/main.go
new file mode 100644
index 0000000..3cf6dbb
--- /dev/null
+++ b/queue/cmd/bench/main.go
@@ -0,0 +1,27 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "vu/benchmark/queue/runner"
+)
+
+func main() {
+ addr := flag.String("addr", ":8080", "queue server address")
+ total := flag.Int("total", 1000, "total tasks to run")
+ concurrency := flag.Int("concurrency", 8, "concurrent client workers")
+ iterations := flag.Int("iterations", 100000, "hash iterations per tasks")
+ flag.Parse()
+
+ err := runner.RunClient(runner.ClientConfig{
+ Addr: *addr,
+ Total: *total,
+ Concurrency: *concurrency,
+ Iterations: *iterations,
+ })
+ if err != nil {
+ fmt.Fprintln(os.Stderr, err)
+ os.Exit(1)
+ }
+}
diff --git a/queue/go.mod b/queue/go.mod
new file mode 100644
index 0000000..c451aab
--- /dev/null
+++ b/queue/go.mod
@@ -0,0 +1,10 @@
+module vu/benchmark/queue
+
+go 1.24.2
+
+require github.com/pkg/profile v1.7.0
+
+require (
+ github.com/felixge/fgprof v0.9.3 // indirect
+ github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
+)
diff --git a/queue/go.sum b/queue/go.sum
new file mode 100644
index 0000000..6e67449
--- /dev/null
+++ b/queue/go.sum
@@ -0,0 +1,25 @@
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
+github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
+github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y=
+github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
+github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
+github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
+github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/queue/internal/base.go b/queue/internal/base.go
new file mode 100644
index 0000000..54b851e
--- /dev/null
+++ b/queue/internal/base.go
@@ -0,0 +1,144 @@
+package internal
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "sync"
+ "vu/benchmark/queue/tasks"
+)
+
+type IQueue interface {
+ Put(task *tasks.Task) (<-chan Output, error)
+ Shutdown() error
+}
+
+type _queue struct {
+ capacity int
+ channel chan _taskWrapper
+ poolSize int
+ closed bool
+ mutex sync.Mutex
+ wg sync.WaitGroup
+ size int
+ logDisabled bool
+}
+
+type _taskWrapper struct {
+ task *tasks.Task
+ channel chan Output
+}
+
+type Output struct {
+ Err error
+ Res []byte
+}
+
+func (q *_queue) Put(task *tasks.Task) (<-chan Output, error) {
+ q.mutex.Lock()
+ if q.size+1 > q.capacity {
+ q.mutex.Unlock()
+ return nil, errors.New("queue is full")
+ }
+
+ if q.closed {
+ q.mutex.Unlock()
+ return nil, errors.New("queue is closed")
+ }
+
+ defer q.mutex.Unlock()
+
+ q.wg.Add(1)
+ q.size += 1
+
+ channel := make(chan Output, 1)
+
+ q.channel <- _taskWrapper{
+ task: task,
+ channel: channel,
+ }
+ return channel, nil
+}
+
+func (q *_queue) Shutdown() error {
+ q.mutex.Lock()
+ q.closed = true
+ q.mutex.Unlock()
+
+ q.wg.Wait()
+ return nil
+}
+
+func (q *_queue) init() {
+ // Avoid spamming stdout when running benchmarks so measurements stay clean.
+ logWorkers := q.shouldLogWorker()
+
+ for i := 0; i < q.poolSize; i++ {
+ workerID := i + 1
+ go func(id int) {
+ for task := range q.channel {
+ if logWorkers {
+ fmt.Printf("Worker %d, pick up tasks %s\n", id, task.task.Id)
+ }
+ res, err := Execute(task)
+ task.channel <- Output{Res: res, Err: err}
+ close(task.channel)
+
+ q.mutex.Lock()
+ q.size--
+ q.mutex.Unlock()
+ q.wg.Done()
+ }
+ }(workerID)
+ }
+}
+
+func NewQueue(capacity int, poolSize int, logDisabled bool) IQueue {
+ channel := make(chan _taskWrapper, capacity)
+
+ queue := &_queue{
+ capacity: capacity,
+ channel: channel,
+ poolSize: poolSize,
+ logDisabled: logDisabled,
+ }
+
+ queue.init()
+
+ return queue
+}
+
+func Execute(task _taskWrapper) ([]byte, error) {
+ switch task.task.Type {
+ case tasks.SumTaskType:
+ return tasks.SumTask(task.task.Input)
+ case tasks.HashTaskType:
+ input := tasks.HashTaskInput{}
+ if err := json.Unmarshal(task.task.Input, &input); err != nil {
+ return nil, err
+ }
+ return tasks.HashTask(input.Iteration), nil
+ case tasks.BurnCPUTaskType:
+ return tasks.BurnCPUTask(task.task.Input)
+ default:
+ {
+ return nil, errors.New("invalid tasks type")
+ }
+ }
+}
+
+// shouldLogWorker reports whether worker-level logging is enabled.
+// Benchmarks pass -test.bench which adds the test.bench flag; if it is
+// non-empty we suppress logs to keep benchmark output clean.
+func (q *_queue) shouldLogWorker() bool {
+ //benchFlag := flag.Lookup("test.bench")
+ //if benchFlag != nil || q.logDisabled {
+ // return false
+ //}
+
+ if q.logDisabled {
+ return false
+ }
+
+ return true
+}
diff --git a/queue/main.go b/queue/main.go
new file mode 100644
index 0000000..d4cb087
--- /dev/null
+++ b/queue/main.go
@@ -0,0 +1,52 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "vu/benchmark/queue/runner"
+)
+
+//go run main.go -mode=server -workers=4 -capacity=10 -addr=:8082
+
+func main() {
+ mode := flag.String("mode", "server", "choose server or client mode")
+ addr := flag.String("addr", ":8080", "tcp listen address")
+
+ // Server options.
+ capacity := flag.Int("capacity", 100, "queue capacity")
+ workers := flag.Int("workers", 8, "number of worker goroutines")
+
+ // Client options.
+ total := flag.Int("total", 1000, "total tasks to run")
+ concurrency := flag.Int("concurrency", 8, "concurrent client workers")
+ iterations := flag.Int("iterations", 100000, "hash iterations per tasks")
+
+ flag.Parse()
+
+ switch *mode {
+ case "server":
+ err := runner.RunServer(runner.ServerConfig{
+ Addr: *addr,
+ Capacity: *capacity,
+ Workers: *workers,
+ })
+ if err != nil {
+ os.Exit(1)
+ }
+ case "client":
+ err := runner.RunClient(runner.ClientConfig{
+ Addr: *addr,
+ Total: *total,
+ Concurrency: *concurrency,
+ Iterations: *iterations,
+ })
+ if err != nil {
+ fmt.Fprintln(os.Stderr, err)
+ os.Exit(1)
+ }
+ default:
+ fmt.Fprintf(os.Stderr, "unknown mode %q (expected server or client)\n", *mode)
+ os.Exit(1)
+ }
+}
diff --git a/queue/runner/client.go b/queue/runner/client.go
new file mode 100644
index 0000000..420cd0e
--- /dev/null
+++ b/queue/runner/client.go
@@ -0,0 +1,233 @@
+package runner
+
+import (
+ "encoding/json"
+ "fmt"
+ "net"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+ "vu/benchmark/queue/tasks"
+)
+
+// ClientConfig collects options for pushing tasks into the queue server.
+type ClientConfig struct {
+ Addr string
+ Total int
+ Concurrency int
+ Iterations int
+}
+
+func RunClient(cfg ClientConfig) error {
+ payload, err := json.Marshal(tasks.HashTaskInput{Iteration: cfg.Iterations})
+ if err != nil {
+ return err
+ }
+
+ var sent int64
+ var completed int64
+
+ var wg sync.WaitGroup
+ errCh := make(chan error, 1)
+ var once sync.Once
+
+ start := time.Now()
+
+ for i := 0; i < cfg.Concurrency; i++ {
+ wg.Add(1)
+ go func(index int) {
+ fmt.Printf("Starting goroutine %d\n", index+1)
+ defer wg.Done()
+
+ conn, err := net.Dial("tcp", cfg.Addr)
+ if err != nil {
+ recordError(errCh, &once, err)
+ return
+ }
+ defer conn.Close()
+
+ encoder := json.NewEncoder(conn)
+ decoder := json.NewDecoder(conn)
+ var lastTask int64
+ for {
+ if lastTask == 0 {
+ lastTask = atomic.AddInt64(&sent, 1)
+ }
+
+ if lastTask > int64(cfg.Total) {
+ return
+ }
+
+ fmt.Printf("Goroutine %d runs tasks %d\n", index+1, int(lastTask))
+
+ task := tasks.Task{
+ Id: strconv.FormatInt(lastTask, 10),
+ Type: tasks.HashTaskType,
+ Input: payload,
+ }
+
+ // ---- NEW: retry loop per tasks ----
+ for retry := 0; retry < 5; retry++ {
+
+ if err := encoder.Encode(task); err != nil {
+ fmt.Printf("Goroutine %d: encode error %v — reconnecting\n", index+1, err)
+ conn.Close()
+
+ // reconnect
+ var err2 error
+ conn, err2 = net.Dial("tcp", cfg.Addr)
+ if err2 != nil {
+ fmt.Printf("Goroutine %d: reconnect failed %v\n", index+1, err2)
+ time.Sleep(time.Second)
+ continue
+ }
+ encoder = json.NewEncoder(conn)
+ decoder = json.NewDecoder(conn)
+ continue
+ }
+
+ var resp clientResponse
+ if err := decoder.Decode(&resp); err != nil {
+ fmt.Printf("Goroutine %d: decode error %v — reconnecting\n", index+1, err)
+ conn.Close()
+
+ // reconnect
+ var err2 error
+ conn, err2 = net.Dial("tcp", cfg.Addr)
+ if err2 != nil {
+ fmt.Printf("Goroutine %d: reconnect failed %v\n", index+1, err2)
+ time.Sleep(time.Second)
+ continue
+ }
+ encoder = json.NewEncoder(conn)
+ decoder = json.NewDecoder(conn)
+ continue
+ }
+
+ if resp.Error != "" {
+ fmt.Printf("Goroutine %d: server error %s — retry\n", index+1, resp.Error)
+ time.Sleep(200 * time.Millisecond)
+ continue
+ }
+
+ // success
+ atomic.AddInt64(&completed, 1)
+ lastTask = 0
+ break
+ }
+ }
+ }(i)
+ }
+ wg.Wait()
+ duration := time.Since(start)
+
+ select {
+ case err := <-errCh:
+ return fmt.Errorf("benchmark aborted after %v: %w", duration, err)
+ default:
+ tput := float64(completed) / duration.Seconds()
+ fmt.Printf("completed %d tasks in %v (throughput: %.2f tasks/sec)\n", completed, duration, tput)
+ return nil
+ }
+}
+
+// RunClient dials the server and pushes hash tasks, returning an error if the run aborts early.
+//func RunClient(cfg ClientConfig) error {
+// payload, err := json.Marshal(tasks.HashTaskInput{Iteration: cfg.Iterations})
+// if err != nil {
+// return err
+// }
+//
+// var sent int64
+// var completed int64
+//
+// var wg sync.WaitGroup
+// errCh := make(chan error, 1)
+// var once sync.Once
+//
+// start := time.Now()
+//
+// for i := 0; i < cfg.Concurrency; i++ {
+// wg.Add(1)
+// go func(index int) {
+// fmt.Printf("Starting goroutine %d\n", index+1)
+// defer wg.Done()
+//
+// conn, err := net.Dial("tcp", cfg.Addr)
+// if err != nil {
+// recordError(errCh, &once, err)
+// return
+// }
+// defer conn.Close()
+//
+// encoder := json.NewEncoder(conn)
+// decoder := json.NewDecoder(conn)
+//
+// var lastTask int64
+// for {
+// if lastTask == 0 {
+// lastTask = atomic.AddInt64(&sent, 1)
+// }
+//
+// if lastTask > int64(cfg.Total) {
+// return
+// }
+//
+// fmt.Printf("Goroutine %d runs tasks %d\n", index+1, int(lastTask))
+//
+// tasks := tasks.Task{
+// Id: strconv.FormatInt(lastTask, 10),
+// Type: tasks.HashTaskType,
+// Input: payload,
+// }
+//
+// if err := encoder.Encode(tasks); err != nil {
+// fmt.Printf("Goroutine %d, met encoder.Encode error %v\n", index+1, err)
+// recordError(errCh, &once, err)
+// return
+// }
+//
+// var resp clientResponse
+// if err := decoder.Decode(&resp); err != nil {
+// fmt.Printf("Goroutine %d, met decoder.Decode error %v\n", index+1, err)
+// recordError(errCh, &once, err)
+// return
+// }
+// if resp.Error != "" {
+// fmt.Printf("Error received: %s. Retry after 2s\n", resp.Error)
+// recordError(errCh, &once, errors.New(resp.Error))
+// time.Sleep(2 * time.Second)
+// continue // retry
+// }
+//
+// lastTask = 0
+// atomic.AddInt64(&completed, 1)
+// }
+// }(i)
+// }
+//
+// wg.Wait()
+// duration := time.Since(start)
+//
+// select {
+// case err := <-errCh:
+// return fmt.Errorf("benchmark aborted after %v: %w", duration, err)
+// default:
+// tput := float64(completed) / duration.Seconds()
+// fmt.Printf("completed %d tasks in %v (throughput: %.2f tasks/sec)\n", completed, duration, tput)
+// return nil
+// }
+//}
+
+type clientResponse struct {
+ ID string `json:"id"`
+ Result []byte `json:"result"`
+ Error string `json:"error"`
+}
+
+func recordError(ch chan<- error, once *sync.Once, err error) {
+ once.Do(func() {
+ ch <- err
+ })
+}
diff --git a/queue/runner/server.go b/queue/runner/server.go
new file mode 100644
index 0000000..2d63655
--- /dev/null
+++ b/queue/runner/server.go
@@ -0,0 +1,42 @@
+package runner
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+ "vu/benchmark/queue/internal"
+ "vu/benchmark/queue/server"
+)
+
+// ServerConfig collects the tunables for running the queue server.
+type ServerConfig struct {
+ Addr string
+ Capacity int
+ Workers int
+}
+
+// RunServer starts the TCP server and blocks until shutdown.
+func RunServer(cfg ServerConfig) error {
+ queue := internal.NewQueue(cfg.Capacity, cfg.Workers, false)
+
+ done := make(chan struct{})
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ <-sigs
+ close(done)
+ fmt.Println("signal received, shutting down")
+ }()
+
+ fmt.Printf("Queue server listening on %s\n", cfg.Addr)
+ err := server.Serve(cfg.Addr, queue, done)
+ if err != nil {
+ fmt.Println("server error:", err)
+ }
+
+ queue.Shutdown()
+ fmt.Println("queue drained, server exiting")
+ return err
+}
diff --git a/queue/server/server.go b/queue/server/server.go
new file mode 100644
index 0000000..26bde72
--- /dev/null
+++ b/queue/server/server.go
@@ -0,0 +1,157 @@
+package server
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+ "vu/benchmark/queue/internal"
+ "vu/benchmark/queue/tasks"
+)
+
+type response struct {
+ ID string `json:"id"`
+ Result []byte `json:"result,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+var waitingGoroutines int64
+
+// Serve listens for TCP connections and forwards incoming tasks to the queue.
+func Serve(addr string, queue internal.IQueue, done <-chan struct{}) error {
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ return err
+ }
+ defer listener.Close()
+
+ var wg sync.WaitGroup
+
+ // Stop accepting new connections when shutdown is signaled.
+ go func() {
+ <-done
+ listener.Close()
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-done:
+ {
+ break
+ }
+ default:
+ time.Sleep(5 * time.Second)
+ fmt.Println("Goroutines count: ", atomic.LoadInt64(&waitingGoroutines))
+ }
+ }
+ }()
+
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ select {
+ case <-done:
+ wg.Wait()
+ return nil
+ default:
+ }
+
+ if ne, ok := err.(net.Error); ok && ne.Temporary() {
+ fmt.Println("temporary accept error:", err)
+ continue
+ }
+
+ wg.Wait()
+ return err
+ }
+
+ wg.Add(1)
+ go func(c net.Conn) {
+ idx := atomic.AddInt64(&waitingGoroutines, 1)
+ defer func() {
+ wg.Done()
+ fmt.Printf("Goroutine %d exits\n", idx+1)
+ atomic.AddInt64(&waitingGoroutines, -1)
+ }()
+ fmt.Printf("Goroutine %d accpet connection\n", idx+1)
+ handleConnection(c, queue, done)
+ }(conn)
+ }
+}
+
+func handleConnection(conn net.Conn, queue internal.IQueue, done <-chan struct{}) {
+ defer conn.Close()
+
+ decoder := json.NewDecoder(conn)
+ encoder := json.NewEncoder(conn)
+
+ // Results coming back from workers
+ results := make(chan response, 16)
+
+ // Writer goroutine
+ writeDone := make(chan struct{})
+ go func() {
+ defer close(writeDone)
+ for resp := range results {
+ encoder.Encode(resp)
+ }
+ }()
+
+ for {
+ var task tasks.Task
+
+ // Detect shutdown
+ select {
+ case <-done:
+ // Tell writer goroutine to exit
+ close(results)
+ <-writeDone
+ return
+ default:
+ }
+
+ // Read next tasks
+ if err := decoder.Decode(&task); err != nil {
+ if errors.Is(err, io.EOF) {
+ // client closed connection normally
+ close(results)
+ <-writeDone
+ return
+ }
+ fmt.Printf("decode error from %s: %v\n", conn.RemoteAddr(), err)
+
+ // send error to client before closing
+ results <- response{Error: err.Error()}
+ close(results)
+ <-writeDone
+ return
+ }
+
+ ch, err := queue.Put(&task)
+ if err != nil {
+ results <- response{ID: task.Id, Error: err.Error()}
+ continue
+ }
+
+ // Spawn worker response waiters
+ go func(id string, workerCh <-chan internal.Output) {
+ output := <-workerCh
+ resp := response{ID: id, Result: output.Res}
+ if output.Err != nil {
+ resp.Error = output.Err.Error()
+ resp.Result = nil
+ }
+
+ // Avoid panic if `results` is already closed during shutdown
+ select {
+ case results <- resp:
+ case <-done:
+ }
+ }(task.Id, ch)
+ }
+}
diff --git a/queue/tasks/base.go b/queue/tasks/base.go
new file mode 100644
index 0000000..7815daf
--- /dev/null
+++ b/queue/tasks/base.go
@@ -0,0 +1,82 @@
+package tasks
+
+import (
+ "crypto/sha256"
+ "encoding/json"
+ "fmt"
+)
+
+const (
+ SumTaskType = "sum"
+ HashTaskType = "hash"
+ BurnCPUTaskType = "BurnCPUTask"
+)
+
+type Task struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ Input []byte `json:"input"`
+}
+
+type SumTaskInput struct {
+ A int `json:"a"`
+ B int `json:"b"`
+}
+
+type SumTaskOutput struct {
+ Res int `json:"res"`
+}
+
+func SumTask(input []byte) ([]byte, error) {
+ inputData := SumTaskInput{}
+ err := json.Unmarshal(input, &inputData)
+ if err != nil {
+ fmt.Println("Unmarshal error:", err)
+ return nil, err
+ }
+
+ res := SumTaskOutput{Res: inputData.A + inputData.B}
+ return json.Marshal(res)
+}
+
+type HashTaskInput struct {
+ Iteration int `json:"iteration"`
+}
+
+type HashTaskOutput struct {
+ Res string `json:"res"`
+}
+
+func HashTask(iterations int) []byte {
+ data := []byte("benchmark")
+ var sum [32]byte
+
+ for i := 0; i < iterations; i++ {
+ sum = sha256.Sum256(data)
+ }
+
+ return sum[:]
+}
+
+type BurnCPUTaskInput struct {
+ Iteration int `json:"iteration"`
+}
+
+type BurnCPUTaskOutput struct {
+ Res int `json:"res"`
+}
+
+func BurnCPUTask(input []byte) ([]byte, error) {
+ inputType := BurnCPUTaskInput{}
+ if err := json.Unmarshal(input, &inputType); err != nil {
+ return nil, err
+ }
+
+ var x uint64 = 1
+ for i := 0; i < inputType.Iteration; i++ {
+ x = x*1664525 + 1013904223 // LCG, prevents optimization
+ }
+
+ bytes, _ := json.Marshal(inputType)
+ return bytes, nil
+}
diff --git a/queue/trace.out b/queue/trace.out
new file mode 100644
index 0000000..25d1519
Binary files /dev/null and b/queue/trace.out differ
diff --git a/subset.go b/subset.go
new file mode 100644
index 0000000..8244693
--- /dev/null
+++ b/subset.go
@@ -0,0 +1,27 @@
+package main
+
+func subsets(nums []int) [][]int {
+ var res [][]int
+ res = append(res, []int{})
+ for i, num := range nums {
+ parentArray := []int{num}
+ res = append(res, parentArray)
+ findSubsets(parentArray, &nums, i+1, &res)
+ }
+
+ return res
+}
+
+func findSubsets(curArray []int, nums *[]int, left int, res *[][]int) {
+ if left >= len(*nums) {
+ return
+ }
+
+ for i := left; i <= len(*nums)-1; i++ {
+ newParSet := make([]int, len(curArray))
+ copy(newParSet, curArray)
+ newParSet = append(newParSet, (*nums)[i])
+ *res = append(*res, newParSet)
+ findSubsets(newParSet, nums, i+1, res)
+ }
+}