Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,150 @@ func (c *Client) BodyAsync(ctx context.Context, messageID string, w io.Writer, o
return ch
}

// BodyRaw retrieves an article body without decoding, returning raw bytes as-is.
func (c *Client) BodyRaw(ctx context.Context, messageID string) (*ArticleBody, error) {
payload := []byte("BODY <" + messageID + ">\r\n")
respCh := c.sendRaw(ctx, payload, nil)

resp := <-respCh
if resp.Err != nil {
return nil, resp.Err
}
if err := toError(resp.StatusCode, resp.Status); err != nil {
return nil, err
}

body := &ArticleBody{
MessageID: messageID,
BytesDecoded: resp.Meta.BytesDecoded,
BytesConsumed: resp.Meta.BytesConsumed,
}
if buf := resp.Body.Bytes(); len(buf) > 0 {
body.Bytes = buf
}
return body, nil
}

// sendRaw sends a BODY request with RawMode, returning raw bytes without decoding.
func (c *Client) sendRaw(ctx context.Context, payload []byte, bodyWriter io.Writer) <-chan Response {
respCh := make(chan Response, 1)
if ctx == nil {
ctx = context.Background()
}
go c.doSendRaw(ctx, payload, bodyWriter, respCh)
return respCh
}

// doSendRaw is the raw-mode variant of doSendWithRetry.
func (c *Client) doSendRaw(ctx context.Context, payload []byte, bodyWriter io.Writer, respCh chan Response) {
defer close(respCh)

tryGroup := func(g *providerGroup) (resp Response, ok bool, done bool) {
innerCh := make(chan Response, 1)
req := &Request{
Ctx: ctx,
Payload: payload,
RespCh: innerCh,
BodyWriter: bodyWriter,
RawMode: true,
}

// Try hot channel first (non-blocking)
select {
case g.hotReqCh <- req:
default:
select {
case <-c.ctx.Done():
return Response{}, false, true
case <-ctx.Done():
return Response{}, false, true
case <-g.ctx.Done():
return Response{}, false, false
case g.reqCh <- req:
}
}

select {
case resp, ok = <-innerCh:
case <-c.ctx.Done():
return Response{}, false, true
case <-ctx.Done():
return Response{}, false, true
case <-g.ctx.Done():
return Response{}, false, false
}
return resp, ok, false
}

var lastResp Response
hasResp := false

// 1. Try all main providers
mains := *c.mainGroups.Load()
n := len(mains)
if n == 0 {
respCh <- Response{Err: errors.New("nntp: no main providers")}
return
}

var start int
switch c.dispatch {
case DispatchFIFO:
for i, g := range mains {
if g.gate.available.Load() > 0 && !g.isQuotaExceeded() {
start = i
break
}
}
case DispatchRoundRobin:
start = int(c.rrIndex.Add(1)) % n
default:
start = rand.IntN(n)
}

for attempt := 0; attempt < n; attempt++ {
idx := (start + attempt) % n
g := mains[idx]

resp, _, done := tryGroup(g)
if done {
respCh <- resp
return
}
if resp.Err == nil {
respCh <- resp
return
}

lastResp = resp
hasResp = true
}

// 2. Try all fallback providers
fallbacks := *c.fallbackGroups.Load()
for _, g := range fallbacks {
resp, _, done := tryGroup(g)
if done {
respCh <- resp
return
}
if resp.Err == nil {
respCh <- resp
return
}

lastResp = resp
hasResp = true
}

// All attempts failed
if hasResp {
respCh <- lastResp
} else {
respCh <- Response{Err: errors.New("nntp: no providers available")}
}
}

// Head retrieves the headers of an article.
func (c *Client) Head(ctx context.Context, messageID string) (*ArticleHead, error) {
payload := []byte("HEAD <" + messageID + ">\r\n")
Expand Down
6 changes: 5 additions & 1 deletion nntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Request struct {
// PostMode signals readerLoop to expect two NNTP responses (340 + 240/441).
PostMode bool

// RawMode signals to return raw body bytes without decoding.
RawMode bool

// postReadyCh is set by writeLoop for PostMode requests. The readerLoop
// sends nil after reading 340 (proceed to write body) or a non-nil error
// otherwise (e.g. 440 posting not allowed). Buffered with capacity 1.
Expand Down Expand Up @@ -967,7 +970,8 @@ func (c *NNTPConnection) readerLoop() {
Request: req,
}
decoder := NNTPResponse{
onMeta: req.OnMeta,
onMeta: req.OnMeta,
RawMode: req.RawMode,
}

// If the request is cancelled after send, we must still drain its response off the wire,
Expand Down
40 changes: 40 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type NNTPResponse struct {
State rapidyenc.State
StatusCode int
CRC uint32
RawMode bool // skip decoding, return raw body bytes

eof bool
body bool
Expand Down Expand Up @@ -70,6 +71,45 @@ func (r *NNTPResponse) Feed(buf []byte, out io.Writer) (consumed int, done bool,
}

func (r *NNTPResponse) decode(buf []byte, out io.Writer) (read int, err error) {
// In raw mode, pass through raw bytes without decoding
if r.RawMode && r.body {
for {
line, rest, found := bytes.Cut(buf, []byte("\r\n"))
if !found {
// No complete line in buffer yet
return 0, nil
}

// Check for end marker
if bytes.Equal(line, []byte(".")) {
r.eof = true
return len(line) + 2, nil
}

// Handle dot stuffing: ".." at line start becomes "."
if bytes.HasPrefix(line, []byte("..")) {
line = line[1:]
}

// Write raw line with CRLF
if _, err := out.Write(line); err != nil {
return 0, err
}
if _, err := out.Write([]byte("\r\n")); err != nil {
return 0, err
}

// Account for bytes consumed (line + CRLF)
consumed := len(line) + 2
if bytes.HasPrefix(buf, []byte("..")) {
consumed = len(buf) - len(rest)
}

read += consumed
buf = rest
}
}

if r.body && r.Format == rapidyenc.FormatYenc {
n, err := r.decodeYenc(buf, out)
if err != nil {
Expand Down