Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/egress/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ The egress control is implemented as a **Sidecar** that shares the network names
- **127.0.0.1** — so packets redirected by iptables to the proxy (127.0.0.1:15353) are accepted by nft.
- **Nameserver IPs** from `/etc/resolv.conf` — so client DNS and proxy upstream work (e.g. private DNS).
Nameserver IPs are validated (unspecified and loopback are skipped) and capped. Use `OPENSANDBOX_EGRESS_MAX_NS` (default `3`; `0` = no cap, `1`–`10` = cap). See [SECURITY-RISKS.md](SECURITY-RISKS.md) for trust and scope of this whitelist.
- **Blocked hostname webhook**
- `OPENSANDBOX_EGRESS_DENY_WEBHOOK`: HTTP endpoint URL. When set, egress asynchronously POSTs JSON **only when a hostname is denied**: `{"hostname": "<original query>", "timestamp": "<RFC3339>", "source": "opensandbox-egress"}`. Default timeout 5s, up to 3 retries with exponential backoff starting at 1s; 4xx is not retried, 5xx/network errors are retried.
- **Allow requirement**: you must allow the webhook host (or its IP/CIDR) in the policy; with default deny, if you don’t explicitly allow it, the webhook traffic will be blocked by egress itself. Example: `{"defaultAction":"deny","egress":[{"action":"allow","target":"webhook.example.com"}]}`. If a broader deny CIDR covers the resolved IP, it will still be blocked—adjust your policy accordingly.
- DoH/DoT blocking:
- DoT (tcp/udp 853) blocked by default.
- Optional DoH over 443: `OPENSANDBOX_EGRESS_BLOCK_DOH_443=true`. If enabled without blocklist, all 443 is dropped.
Expand Down
9 changes: 9 additions & 0 deletions components/egress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/alibaba/opensandbox/egress/pkg/constants"
"github.com/alibaba/opensandbox/egress/pkg/dnsproxy"
"github.com/alibaba/opensandbox/egress/pkg/events"
"github.com/alibaba/opensandbox/egress/pkg/iptables"
"github.com/alibaba/opensandbox/egress/pkg/log"
slogger "github.com/alibaba/opensandbox/internal/logger"
Expand Down Expand Up @@ -64,6 +65,14 @@ func main() {
}
log.Infof("dns proxy started on 127.0.0.1:15353")

if blockWebhookURL := strings.TrimSpace(os.Getenv(constants.EnvBlockedWebhook)); blockWebhookURL != "" {
blockedBroadcaster := events.NewBroadcaster(ctx, events.BroadcasterConfig{QueueSize: 256})
blockedBroadcaster.AddSubscriber(events.NewWebhookSubscriber(blockWebhookURL))
proxy.SetBlockedBroadcaster(blockedBroadcaster)
defer blockedBroadcaster.Close()
log.Infof("blocked hostname webhook enabled: %s", blockWebhookURL)
}

exemptDst := dnsproxy.ParseNameserverExemptList()
if len(exemptDst) > 0 {
log.Infof("nameserver exempt list: %v (proxy upstream in this list will not set SO_MARK)", exemptDst)
Expand Down
1 change: 1 addition & 0 deletions components/egress/pkg/constants/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
EnvEgressRules = "OPENSANDBOX_EGRESS_RULES"
EnvEgressLogLevel = "OPENSANDBOX_EGRESS_LOG_LEVEL"
EnvMaxNameservers = "OPENSANDBOX_EGRESS_MAX_NS"
EnvBlockedWebhook = "OPENSANDBOX_EGRESS_DENY_WEBHOOK"

// EnvNameserverExempt comma-separated IPs; proxy upstream to these is not marked and is allowed in nft allow set
EnvNameserverExempt = "OPENSANDBOX_EGRESS_NAMESERVER_EXEMPT"
Expand Down
20 changes: 20 additions & 0 deletions components/egress/pkg/dnsproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/miekg/dns"

"github.com/alibaba/opensandbox/egress/pkg/events"
"github.com/alibaba/opensandbox/egress/pkg/log"
"github.com/alibaba/opensandbox/egress/pkg/nftables"
"github.com/alibaba/opensandbox/egress/pkg/policy"
Expand All @@ -41,6 +42,9 @@ type Proxy struct {

// optional; called in goroutine when A/AAAA are present
onResolved func(domain string, ips []nftables.ResolvedIP)

// optional broadcaster to notify blocked hostnames
blockedBroadcaster *events.Broadcaster
}

// New builds a proxy with resolved upstream; listenAddr can be empty for default.
Expand Down Expand Up @@ -109,6 +113,7 @@ func (p *Proxy) serveDNS(w dns.ResponseWriter, r *dns.Msg) {
currentPolicy := p.policy
p.policyMu.RUnlock()
if currentPolicy != nil && currentPolicy.Evaluate(domain) == policy.ActionDeny {
p.publishBlocked(domain)
resp := new(dns.Msg)
resp.SetRcode(r, dns.RcodeNameError)
_ = w.WriteMsg(resp)
Expand Down Expand Up @@ -179,6 +184,21 @@ func (p *Proxy) SetOnResolved(fn func(domain string, ips []nftables.ResolvedIP))
p.onResolved = fn
}

// SetBlockedBroadcaster wires a broadcaster used to notify blocked hostnames.
func (p *Proxy) SetBlockedBroadcaster(b *events.Broadcaster) {
p.blockedBroadcaster = b
}

func (p *Proxy) publishBlocked(domain string) {
if p.blockedBroadcaster == nil {
return
}
p.blockedBroadcaster.Publish(events.BlockedEvent{
Hostname: domain,
Timestamp: time.Now().UTC(),
})
}

// extractResolvedIPs parses A and AAAA records from resp.Answer into ResolvedIP slice.
//
// Uses netip.ParseAddr(v.A.String()) which allocates a temporary string per record; typically
Expand Down
117 changes: 117 additions & 0 deletions components/egress/pkg/events/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2026 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"context"
"sync"
"time"

"github.com/alibaba/opensandbox/egress/pkg/log"
)

const defaultQueueSize = 128

// BlockedEvent describes a blocked hostname notification.
type BlockedEvent struct {
Hostname string `json:"hostname"`
Timestamp time.Time `json:"timestamp"`
}

// Subscriber consumes blocked events.
type Subscriber interface {
HandleBlocked(ctx context.Context, ev BlockedEvent)
}

// BroadcasterConfig defines queue sizing for the broadcaster.
type BroadcasterConfig struct {
QueueSize int
}

// Broadcaster fans out blocked events to one or more subscribers via channels.
type Broadcaster struct {
ctx context.Context
cancel context.CancelFunc

mu sync.RWMutex
subscribers []chan BlockedEvent
queueSize int
}

// NewBroadcaster builds a broadcaster with the given queue size (defaults to 128).
func NewBroadcaster(ctx context.Context, cfg BroadcasterConfig) *Broadcaster {
if cfg.QueueSize <= 0 {
cfg.QueueSize = defaultQueueSize
}
cctx, cancel := context.WithCancel(ctx)
return &Broadcaster{
ctx: cctx,
cancel: cancel,
queueSize: cfg.QueueSize,
}
}

// AddSubscriber registers a new subscriber with its own buffered queue and worker.
func (b *Broadcaster) AddSubscriber(sub Subscriber) {
if sub == nil {
return
}
ch := make(chan BlockedEvent, b.queueSize)

b.mu.Lock()
b.subscribers = append(b.subscribers, ch)
b.mu.Unlock()

go func() {
for {
select {
case <-b.ctx.Done():
return
case ev, ok := <-ch:
if !ok {
return
}
sub.HandleBlocked(b.ctx, ev)
}
}
}()
}

// Publish sends an event to all subscribers; drops and logs when a subscriber queue is full.
func (b *Broadcaster) Publish(event BlockedEvent) {
b.mu.RLock()
subs := append([]chan BlockedEvent(nil), b.subscribers...)
b.mu.RUnlock()

for _, ch := range subs {
select {
case ch <- event:
default:
log.Warnf("[events] blocked-event queue full; dropping hostname %s", event.Hostname)
}
}
}

// Close stops all workers and closes subscriber queues.
func (b *Broadcaster) Close() {
b.cancel()

b.mu.Lock()
for _, ch := range b.subscribers {
close(ch)
}
b.subscribers = nil
b.mu.Unlock()
}
139 changes: 139 additions & 0 deletions components/egress/pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2026 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package events

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
)

type captureSubscriber struct {
recv chan BlockedEvent
}

func (c *captureSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
c.recv <- ev
}

type blockingSubscriber struct {
block chan struct{}
}

func (b *blockingSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
// Block until the channel is closed to simulate a slow consumer and trigger backpressure.
<-b.block
_ = ev
}

func TestBroadcasterFanout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 2})

sub1 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
sub2 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
b.AddSubscriber(sub1)
b.AddSubscriber(sub2)

ev := BlockedEvent{Hostname: "example.com.", Timestamp: time.Now()}
b.Publish(ev)

select {
case got := <-sub1.recv:
if got.Hostname != ev.Hostname {
t.Fatalf("sub1 expected hostname %s, got %s", ev.Hostname, got.Hostname)
}
case <-time.After(2 * time.Second):
t.Fatal("sub1 did not receive event")
}

select {
case got := <-sub2.recv:
if got.Hostname != ev.Hostname {
t.Fatalf("sub2 expected hostname %s, got %s", ev.Hostname, got.Hostname)
}
case <-time.After(2 * time.Second):
t.Fatal("sub2 did not receive event")
}

b.Close()
}

func TestBroadcasterDropsWhenSubscriberBackedUp(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Small queue; blocking subscriber will hold the first event.
b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 1})
block := make(chan struct{})
sub := &blockingSubscriber{block: block}
b.AddSubscriber(sub)

ev1 := BlockedEvent{Hostname: "first.example", Timestamp: time.Now()}
ev2 := BlockedEvent{Hostname: "second.example", Timestamp: time.Now()}

b.Publish(ev1)
// This publish should drop because subscriber is blocked and queue size is 1.
b.Publish(ev2)

// Allow subscriber to drain and exit.
close(block)

b.Close()
}

func TestWebhookSubscriberSendsPayload(t *testing.T) {
var (
gotMethod string
gotPayload webhookPayload
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotMethod = r.Method
body, _ := io.ReadAll(r.Body)
_ = r.Body.Close()
_ = json.Unmarshal(body, &gotPayload)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

sub := NewWebhookSubscriber(server.URL)
if sub == nil {
t.Fatal("webhook subscriber should not be nil")
}

ts := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)
ev := BlockedEvent{Hostname: "Example.com.", Timestamp: ts}
sub.HandleBlocked(context.Background(), ev)

if gotMethod != http.MethodPost {
t.Fatalf("expected POST, got %s", gotMethod)
}
if gotPayload.Hostname != ev.Hostname {
t.Fatalf("expected hostname %s, got %s", ev.Hostname, gotPayload.Hostname)
}
if gotPayload.Source != webhookSource {
t.Fatalf("expected source %s, got %s", webhookSource, gotPayload.Source)
}
if gotPayload.Timestamp == "" {
t.Fatalf("expected timestamp to be set")
}
}
Loading