Skip to content

Commit

Permalink
Add thinking support for claude-3-7-sonnet
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikey O'Brien committed Feb 25, 2025
1 parent ff41479 commit 599726d
Showing 1 changed file with 50 additions and 5 deletions.
55 changes: 50 additions & 5 deletions examples/pipelines/providers/anthropic_manifold_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
license: MIT
description: A pipeline for generating text and processing images using the Anthropic API.
requirements: requests, sseclient-py
environment_variables: ANTHROPIC_API_KEY
environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING
"""

import os
Expand All @@ -22,14 +22,26 @@
class Pipeline:
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = ""
ANTHROPIC_THINKING_BUDGET_TOKENS: int = 1024
ANTHROPIC_ENABLE_THINKING: bool = False

def __init__(self):
self.type = "manifold"
self.id = "anthropic"
self.name = "anthropic/"

self.valves = self.Valves(
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")}
**{
"ANTHROPIC_API_KEY": os.getenv(
"ANTHROPIC_API_KEY", "your-api-key-here"
),
"ANTHROPIC_THINKING_BUDGET_TOKENS": os.getenv(
"ANTHROPIC_THINKING_BUDGET_TOKENS", 1024
),
"ANTHROPIC_ENABLE_THINKING": os.getenv(
"ANTHROPIC_ENABLE_THINKING", False
),
}
)
self.url = 'https://api.anthropic.com/v1/messages'
self.update_headers()
Expand Down Expand Up @@ -139,6 +151,26 @@ def pipe(
}

if body.get("stream", False):
supports_thinking = "claude-3-7" in model_id
if supports_thinking and self.valves.ANTHROPIC_ENABLE_THINKING:
try:
budget_tokens = int(
self.valves.ANTHROPIC_THINKING_BUDGET_TOKENS
)
payload["thinking"] = {
"type": "enabled",
"budget_tokens": budget_tokens,
}
# Thinking requires temperature 1.0 and does not support top_p, top_k
payload["temperature"] = 1.0
if "top_k" in payload:
del payload["top_k"]
if "top_p" in payload:
del payload["top_p"]
except ValueError:
print(
f"Warning: Invalid thinking budget tokens: {self.valves.ANTHROPIC_THINKING_BUDGET_TOKENS}"
)
return self.stream_response(payload)
else:
return self.get_completion(payload)
Expand All @@ -147,16 +179,25 @@ def pipe(

def stream_response(self, payload: dict) -> Generator:
response = requests.post(self.url, headers=self.headers, json=payload, stream=True)
"""Used for title and tag generation"""

if response.status_code == 200:
client = sseclient.SSEClient(response)
for event in client.events():
try:
data = json.loads(event.data)
if data["type"] == "content_block_start":
yield data["content_block"]["text"]
if data["content_block"]["type"] == "thinking":
yield "<think>"
else:
yield data["content_block"]["text"]
elif data["type"] == "content_block_delta":
yield data["delta"]["text"]
if data["delta"]["type"] == "thinking_delta":
yield data["delta"]["thinking"]
elif data["delta"]["type"] == "signature_delta":
yield "\n </think> \n\n"
else:
yield data["delta"]["text"]
elif data["type"] == "message_stop":
break
except json.JSONDecodeError:
Expand All @@ -171,6 +212,10 @@ def get_completion(self, payload: dict) -> str:
response = requests.post(self.url, headers=self.headers, json=payload)
if response.status_code == 200:
res = response.json()
return res["content"][0]["text"] if "content" in res and res["content"] else ""
for content in res["content"]:
if not content.get("text"):
continue
return content["text"]
return ""
else:
raise Exception(f"Error: {response.status_code} - {response.text}")

0 comments on commit 599726d

Please sign in to comment.