Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cc_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"verbose": False,
"thinking": False,
"thinking_budget": 10000,
"thinking_mode": "loud", # "loud" = think-out-loud with <thinking> tags
"custom_base_url": "", # for "custom" provider
"max_tool_output": 32000,
"max_agent_depth": 3,
Expand Down
12 changes: 12 additions & 0 deletions context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@

from memory import get_memory_context

_THINK_OUT_LOUD_PROMPT = """
# Think-Out-Loud Mode

Wrap your internal reasoning in `<""" + """thinking>...</""" + """thinking>` XML tags.
These are displayed to the user in italic but stripped from your context in future turns.

Use thinking for: planning, analyzing code, debugging, weighing options.
Keep it focused -- commit to an approach, avoid loops.
You can interleave thinking blocks with visible text freely.
Do NOT wrap simple responses in thinking.
"""

# ── Prompt injection detection ───────────────────────────────────────────
_THREAT_PATTERNS = [
re.compile(r'ignore\s+(previous|all|above|prior)(\s+\w+)*\s+(instructions?|prompts?|rules?)', re.I),
Expand Down
148 changes: 148 additions & 0 deletions tests/test_thinking_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Tests for thinking_parser module."""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from thinking_parser import ThinkingStreamParser, LoopDetector, strip_thinking_tags, OPEN_TAG, CLOSE_TAG

# Build test strings using the tag constants to avoid XML transport issues
def _wrap(inner):
return OPEN_TAG + inner + CLOSE_TAG


class TestThinkingStreamParser:
def test_no_thinking_tags(self):
p = ThinkingStreamParser()
events = p.feed("Hello world")
events += p.finalize()
texts = "".join(t for k, t in events if k == "text")
assert texts == "Hello world"
assert not any(k == "thinking" for k, _ in events)

def test_basic_thinking_block(self):
p = ThinkingStreamParser()
events = p.feed(_wrap("reasoning") + "answer")
events += p.finalize()
thinking = "".join(t for k, t in events if k == "thinking")
text = "".join(t for k, t in events if k == "text")
assert thinking == "reasoning"
assert text == "answer"

def test_multiple_blocks(self):
p = ThinkingStreamParser()
events = p.feed(_wrap("A") + "X" + _wrap("B") + "Y")
events += p.finalize()
thinking = "".join(t for k, t in events if k == "thinking")
text = "".join(t for k, t in events if k == "text")
assert thinking == "AB"
assert text == "XY"

def test_partial_open_tag_across_chunks(self):
p = ThinkingStreamParser()
all_events = []
# Split the open tag across two chunks
all_events += p.feed("before" + OPEN_TAG[:4])
all_events += p.feed(OPEN_TAG[4:] + "inside" + CLOSE_TAG + "after")
all_events += p.finalize()
thinking = "".join(t for k, t in all_events if k == "thinking")
text = "".join(t for k, t in all_events if k == "text")
assert thinking == "inside"
assert text == "beforeafter"

def test_partial_close_tag_across_chunks(self):
p = ThinkingStreamParser()
all_events = []
all_events += p.feed(OPEN_TAG + "inside" + CLOSE_TAG[:6])
all_events += p.feed(CLOSE_TAG[6:] + "outside")
all_events += p.finalize()
thinking = "".join(t for k, t in all_events if k == "thinking")
text = "".join(t for k, t in all_events if k == "text")
assert thinking == "inside"
assert text == "outside"

def test_unclosed_tag_flushed_on_finalize(self):
p = ThinkingStreamParser()
events = p.feed(OPEN_TAG + "unclosed")
events += p.finalize()
thinking = "".join(t for k, t in events if k == "thinking")
assert thinking == "unclosed"

def test_text_before_thinking(self):
p = ThinkingStreamParser()
events = p.feed("before" + _wrap("during") + "after")
events += p.finalize()
assert events == [
("text", "before"),
("thinking", "during"),
("text", "after"),
]

def test_empty_thinking_block(self):
p = ThinkingStreamParser()
events = p.feed(_wrap("") + "text")
events += p.finalize()
text = "".join(t for k, t in events if k == "text")
assert text == "text"

def test_character_by_character(self):
p = ThinkingStreamParser()
full = _wrap("abc") + "xyz"
events = []
for ch in full:
events += p.feed(ch)
events += p.finalize()
thinking = "".join(t for k, t in events if k == "thinking")
text = "".join(t for k, t in events if k == "text")
assert thinking == "abc"
assert text == "xyz"

def test_newlines_preserved(self):
p = ThinkingStreamParser()
events = p.feed(_wrap("line1\nline2\n"))
events += p.finalize()
thinking = "".join(t for k, t in events if k == "thinking")
assert thinking == "line1\nline2\n"

def test_full_thinking_text_property(self):
p = ThinkingStreamParser()
p.feed(_wrap("part1") + "gap" + _wrap("part2"))
assert p.full_thinking_text == "part1part2"


class TestLoopDetector:
def test_no_repetition(self):
d = LoopDetector()
assert not d.feed("Normal text without any repeating patterns at all here.")

def test_short_text(self):
d = LoopDetector()
assert not d.feed("abc" * 10)

def test_long_repeating_pattern(self):
d = LoopDetector()
pattern = "I need to analyze this carefully now. ok " * 10
assert d.feed(pattern)

def test_incremental_detection(self):
d = LoopDetector()
pattern = "I keep repeating this same thought!! "
for _ in range(6):
assert not d.feed(pattern)
for _ in range(6):
if d.feed(pattern):
return
assert False, "Should have detected loop"


class TestStripThinkingTags:
def test_basic(self):
assert strip_thinking_tags(_wrap("r") + "answer") == "answer"

def test_multiline(self):
result = strip_thinking_tags(_wrap("line1\nline2\n") + "answer")
assert result == "answer"

def test_multiple_blocks(self):
assert strip_thinking_tags(_wrap("a") + "X" + _wrap("b") + "Y") == "XY"

def test_no_tags(self):
assert strip_thinking_tags("just text") == "just text"
119 changes: 119 additions & 0 deletions thinking_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Incremental parser for thinking XML tags in text stream, with loop detection."""

import re

OPEN_TAG = "<" + "thinking>"
CLOSE_TAG = "</" + "thinking>"
_THINKING_RE = re.compile(r"<" + r"thinking>.*?</" + r"thinking>", re.DOTALL)


class ThinkingStreamParser:
"""Parses a stream of text chunks, separating thinking blocks from regular text."""

def __init__(self):
self._buffer = ""
self._in_thinking = False
self._thinking_text: list[str] = []

def feed(self, chunk: str) -> list[tuple[str, str]]:
"""Feed a chunk. Returns list of ("text", content) or ("thinking", content) tuples."""
self._buffer += chunk
results: list[tuple[str, str]] = []

while self._buffer:
if self._in_thinking:
end_idx = self._buffer.find(CLOSE_TAG)
if end_idx != -1:
thinking_content = self._buffer[:end_idx]
if thinking_content:
results.append(("thinking", thinking_content))
self._thinking_text.append(thinking_content)
self._buffer = self._buffer[end_idx + len(CLOSE_TAG):]
self._in_thinking = False
else:
for i in range(1, min(len(CLOSE_TAG), len(self._buffer) + 1)):
if CLOSE_TAG[:i] == self._buffer[-i:]:
emit = self._buffer[:-i]
if emit:
results.append(("thinking", emit))
self._thinking_text.append(emit)
self._buffer = self._buffer[-i:]
return results
results.append(("thinking", self._buffer))
self._thinking_text.append(self._buffer)
self._buffer = ""
else:
start_idx = self._buffer.find(OPEN_TAG)
if start_idx != -1:
text_before = self._buffer[:start_idx]
if text_before:
results.append(("text", text_before))
self._buffer = self._buffer[start_idx + len(OPEN_TAG):]
self._in_thinking = True
else:
for i in range(1, min(len(OPEN_TAG), len(self._buffer) + 1)):
if OPEN_TAG[:i] == self._buffer[-i:]:
emit = self._buffer[:-i]
if emit:
results.append(("text", emit))
self._buffer = self._buffer[-i:]
return results
results.append(("text", self._buffer))
self._buffer = ""

return results

def finalize(self) -> list[tuple[str, str]]:
"""Flush remaining buffer."""
results: list[tuple[str, str]] = []
if self._buffer:
kind = "thinking" if self._in_thinking else "text"
results.append((kind, self._buffer))
if self._in_thinking:
self._thinking_text.append(self._buffer)
self._buffer = ""
return results

@property
def full_thinking_text(self) -> str:
return "".join(self._thinking_text)

@property
def in_thinking(self) -> bool:
return self._in_thinking


class LoopDetector:
"""Detects repetitive patterns in a stream of text."""

WINDOW_SIZE = 2000
PATTERN_LENGTHS = list(range(20, 51)) + list(range(55, 201, 5))
MIN_REPEATS = 8

def __init__(self):
self._window = ""

def feed(self, text: str) -> bool:
"""Feed text. Returns True if a loop is detected."""
self._window += text
if len(self._window) > self.WINDOW_SIZE:
self._window = self._window[-self.WINDOW_SIZE:]
if len(self._window) < 200:
return False
return any(self._check_pattern(plen) for plen in self.PATTERN_LENGTHS)

def _check_pattern(self, pattern_length: int) -> bool:
needed = pattern_length * self.MIN_REPEATS
if len(self._window) < needed:
return False
tail = self._window[-needed:]
pattern = tail[:pattern_length]
return all(
tail[i : i + pattern_length] == pattern
for i in range(pattern_length, needed, pattern_length)
)


def strip_thinking_tags(content: str) -> str:
"""Remove all thinking blocks from content."""
return _THINKING_RE.sub("", content).strip()
Loading