|
1 |
| -from typing import List, Dict |
2 |
| -from openai import OpenAI |
3 |
| -import logging |
4 | 1 | import json
|
5 |
| -from .utils import chunk_text |
| 2 | +import logging |
| 3 | +import uuid |
| 4 | +from datetime import datetime |
| 5 | +from time import sleep |
| 6 | +from typing import Dict, List, Optional |
| 7 | +from tqdm import tqdm |
| 8 | +from openai import OpenAI |
| 9 | +import nltk |
| 10 | +from nltk.tokenize import sent_tokenize |
6 | 11 |
|
7 |
| -logging.basicConfig( |
8 |
| - level=logging.INFO, |
9 |
| - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
10 |
| -) |
| 12 | +logging.basicConfig(level=logging.INFO) |
11 | 13 | logger = logging.getLogger(__name__)
|
12 | 14 |
|
13 | 15 | class ContentProcessor:
|
14 | 16 | def __init__(self, api_key: str):
|
15 | 17 | self.client = OpenAI(api_key=api_key)
|
| 18 | + self.rate_limit_delay = 1 |
| 19 | + nltk.download('punkt', quiet=True) |
16 | 20 |
|
17 |
| - def generate_extraction_prompt(self, content: str, instructions: str) -> str: |
18 |
| - return f""" |
19 |
| - Given the following web content and instructions, extract and structure the relevant information. |
20 |
| - Create a well-organized document that can be used for RAG applications. |
| 21 | + def preprocess_content(self, content: str, max_length: int = 4000) -> List[str]: |
| 22 | + sentences = sent_tokenize(content) |
| 23 | + chunks = [] |
| 24 | + current_chunk = [] |
| 25 | + current_length = 0 |
21 | 26 |
|
22 |
| - Instructions: {instructions} |
| 27 | + for sentence in sentences: |
| 28 | + if current_length + len(sentence) > max_length: |
| 29 | + if current_chunk: |
| 30 | + chunks.append(' '.join(current_chunk)) |
| 31 | + current_chunk = [sentence] |
| 32 | + current_length = len(sentence) |
| 33 | + else: |
| 34 | + current_chunk.append(sentence) |
| 35 | + current_length += len(sentence) |
| 36 | + |
| 37 | + if current_chunk: |
| 38 | + chunks.append(' '.join(current_chunk)) |
| 39 | + return chunks |
23 | 40 |
|
24 |
| - Content: |
25 |
| - {content} |
| 41 | + def generate_rag_prompt(self, content: str, instructions: str, metadata: Dict) -> str: |
| 42 | + return f""" |
| 43 | + Process this content for RAG system integration. |
| 44 | + Instructions: {instructions} |
| 45 | + Source URL: {metadata.get('url', 'Unknown')} |
| 46 | + Content: {content} |
26 | 47 |
|
27 |
| - Return the content in the following JSON format: |
| 48 | + Return strictly valid JSON matching this structure: |
28 | 49 | {{
|
29 |
| - "title": "Brief title describing the content", |
30 |
| - "summary": "Brief summary of the key points", |
31 |
| - "content": "Main extracted content, relevant to the instructions", |
32 |
| - "metadata": {{ |
33 |
| - "topics": ["relevant", "topics", "covered"], |
34 |
| - "relevance_score": 0-1 score indicating relevance to instructions |
35 |
| - }} |
| 50 | + "text": "Main content for embedding", |
| 51 | + "title": "Descriptive section title", |
| 52 | + "source_url": "Origin URL", |
| 53 | + "chunk_type": "policy|procedure|faq|general", |
| 54 | + "topics": ["topic1", "topic2"], |
| 55 | + "context": "Additional retrieval context", |
| 56 | + "relevance_score": 0.0 to 1.0 |
36 | 57 | }}
|
37 | 58 | """
|
38 | 59 |
|
39 |
| - def process_chunk(self, chunk: str, instructions: str) -> Dict: |
40 |
| - # """Process a single chunk of content using GPT.""" |
| 60 | + def process_chunk(self, chunk: str, instructions: str, metadata: Dict) -> Optional[Dict]: |
41 | 61 | try:
|
| 62 | + sleep(self.rate_limit_delay) |
42 | 63 | response = self.client.chat.completions.create(
|
43 |
| - model="gpt-4", # Changed from gpt-4-turbo-preview |
| 64 | + model="gpt-3.5-turbo", |
44 | 65 | messages=[{
|
45 | 66 | "role": "system",
|
46 |
| - "content": "You are a content extraction AI that processes web content into structured documents for RAG systems." |
| 67 | + "content": "You are a RAG content processor. Return only valid JSON." |
47 | 68 | }, {
|
48 | 69 | "role": "user",
|
49 |
| - "content": self.generate_extraction_prompt(chunk, instructions) |
| 70 | + "content": self.generate_rag_prompt(chunk, instructions, metadata) |
50 | 71 | }],
|
51 |
| - temperature=0.3 |
| 72 | + temperature=0.3, |
| 73 | + max_tokens=1000 |
52 | 74 | )
|
53 | 75 |
|
54 |
| - try: |
55 |
| - content = response.choices[0].message.content |
56 |
| - return json.loads(content) |
57 |
| - except json.JSONDecodeError as e: |
58 |
| - logger.error(f"Failed to parse GPT response as JSON: {e}") |
| 76 | + content = response.choices[0].message.content.strip() |
| 77 | + if content.startswith("```json"): |
| 78 | + content = content[7:-3] |
| 79 | + |
| 80 | + result = json.loads(content) |
| 81 | + |
| 82 | + if result.get('relevance_score', 0) < 0.5: |
59 | 83 | return None
|
60 | 84 |
|
| 85 | + return { |
| 86 | + "id": str(uuid.uuid4()), |
| 87 | + "text": result["text"], |
| 88 | + "metadata": { |
| 89 | + "title": result["title"], |
| 90 | + "source_url": result["source_url"], |
| 91 | + "chunk_type": result["chunk_type"], |
| 92 | + "timestamp": datetime.now().isoformat(), |
| 93 | + "topics": result["topics"], |
| 94 | + "context": result["context"], |
| 95 | + "relevance_score": result["relevance_score"] |
| 96 | + } |
| 97 | + } |
| 98 | + |
61 | 99 | except Exception as e:
|
62 |
| - logger.error(f"Error processing chunk: {str(e)}") |
| 100 | + logger.error(f"Processing error: {str(e)}") |
63 | 101 | return None
|
64 | 102 |
|
65 | 103 | def process(self, pages: List[Dict], instructions: str = None) -> List[Dict]:
|
66 | 104 | processed_documents = []
|
67 | 105 |
|
68 |
| - for page in pages: |
| 106 | + for page in tqdm(pages, desc="Processing pages"): |
69 | 107 | try:
|
70 |
| - chunks = chunk_text(page['content'], max_length=4000) |
| 108 | + chunks = self.preprocess_content(page['content']) |
| 109 | + metadata = { |
| 110 | + "url": page['url'], |
| 111 | + "title": page['title'], |
| 112 | + "structured_data": page.get('structured_data', {}) |
| 113 | + } |
71 | 114 |
|
72 | 115 | for chunk in chunks:
|
73 |
| - processed = self.process_chunk(chunk, instructions) |
74 |
| - if processed: |
75 |
| - if 'metadata' in processed: |
76 |
| - processed['metadata']['source_url'] = page['url'] |
77 |
| - processed_documents.append(processed) |
78 |
| - else: |
79 |
| - logger.warning(f"Failed to process chunk from {page['url']}") |
| 116 | + doc = self.process_chunk(chunk, instructions, metadata) |
| 117 | + if doc: |
| 118 | + processed_documents.append(doc) |
80 | 119 |
|
81 | 120 | except Exception as e:
|
82 | 121 | logger.error(f"Error processing page {page['url']}: {str(e)}")
|
83 | 122 | continue
|
84 | 123 |
|
85 | 124 | return processed_documents
|
| 125 | + |
| 126 | + def save_to_jsonl(self, documents: List[Dict], output_file: str): |
| 127 | + """Save documents in JSONL format for RAG systems.""" |
| 128 | + with open(output_file, 'w', encoding='utf-8') as f: |
| 129 | + for doc in documents: |
| 130 | + f.write(json.dumps(doc) + '\n') |
0 commit comments