-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhtml_parser.py
More file actions
261 lines (204 loc) · 11 KB
/
html_parser.py
File metadata and controls
261 lines (204 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
HTML Parser with Dynamic Content Extraction
This module provides a comprehensive HTML parsing system that extracts both static HTML content
and dynamic JavaScript-rendered content, with deduplication and text normalization capabilities.
Architecture:
1. Imports and Dependencies
2. Main Parser Class
3. Factory Functions
Note:
- Abstract base classes (MetadataExtractor, ContentFilter) are defined in interfaces.py
- Utility classes are separated into: splitter_config.py, text_normalizer.py, html_cleaner.py
- Metadata extractors are in metadata_extractors.py
- Content filters are in content_filters.py
"""
# ============================================================================
# 1. IMPORTS AND DEPENDENCIES
# ============================================================================
from typing import Dict, List, Any, Optional
from bs4 import BeautifulSoup, Tag
import requests
from datetime import datetime
from interfaces import MetadataExtractor, ContentFilter
from splitter_config import SplitterConfig
from text_normalizer import TextNormalizer
from html_cleaner import HTMLCleaner
from metadata_extractors import BasicMetadataExtractor, ScriptDataExtractor
from content_filters import StructuredDataFilter, HeaderOnlyFilter, MinLengthFilter
class HTMLParser:
"""Configurable HTML parser with pluggable metadata extractors and content filters"""
def __init__(self,
chunk_size: int = 500,
chunk_overlap: int = 50,
headers_to_split: Optional[List[tuple]] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.headers_to_split = headers_to_split
# Default extractors and filters (we only have one right now, but we could potentially add one for parsing metadota from url like campus, educational_level, language etc.)
self.metadata_extractors: List[MetadataExtractor] = []
self.content_filters: List[ContentFilter] = []
# Initialize splitters using shared configuration
self.html_splitter = SplitterConfig.create_html_splitter(self.headers_to_split)
self.text_splitter = SplitterConfig.create_text_splitter(self.chunk_size, self.chunk_overlap)
def add_metadata_extractor(self, extractor: MetadataExtractor):
"""Add a metadata extractor"""
self.metadata_extractors.append(extractor)
return self
def add_content_filter(self, filter: ContentFilter):
"""Add a content filter"""
self.content_filters.append(filter)
return self
def _normalize_section_content(self, content: str) -> str:
"""Normalize section content using centralized TextNormalizer"""
return TextNormalizer.normalize_content(content)
def _extract_metadata(self, soup: BeautifulSoup, url: str, content: str) -> Dict[str, Any]:
"""Run all metadata extractors and combine results"""
metadata = {
'index_timestamp': datetime.now().isoformat(),
'source_url': url,
'source_id' : TextNormalizer.stable_url_hash(url)
}
for extractor in self.metadata_extractors:
metadata.update(extractor.extract(soup, url, content))
return metadata
def _should_skip_content(self, content: str, metadata: Dict[str, Any]) -> bool:
"""Check if content should be skipped based on filters"""
return any(filter.should_skip(content, metadata) for filter in self.content_filters)
def _calculate_token_coverage(self, static_text: str, dynamic_text: str) -> float:
"""Calculate what percentage of static text tokens are covered by dynamic text"""
return TextNormalizer.calculate_token_coverage(static_text, dynamic_text)
def _is_static_chunk_covered_by_dynamic(self, static_text: str, dynamic_chunks: List[Dict], coverage_threshold: float = 0.7) -> bool:
for dynamic_chunk in dynamic_chunks:
coverage = self._calculate_token_coverage(static_text, dynamic_chunk['text'])
if coverage >= coverage_threshold:
return True
return False
def parse_url(self, url: str, last_mod: str, language: Optional[str] = None) -> Dict[str, Any]:
"""Parse HTML from URL and return structured chunks"""
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as exc:
# Set a breakpoint on the line below to inspect failures while debugging
# raise RuntimeError(f"Failed to fetch {url}") from exc
return None
html = response.text
return self.parse_html(html, url, last_mod, language)
def parse_html(self, html: str, url: str, last_mod: str, language: str) -> Dict[str, Any]:
"""Parse HTML content and return structured chunks with both static and dynamic content"""
soup = BeautifulSoup(html, 'html.parser')
# Clean HTML and get main content
content_html = self._clean_html(soup)
# Extract page metadata (includes dynamic content)
page_metadata = self._extract_metadata(soup, url, content_html)
page_metadata['last_modified'] = last_mod
# Attach language if provided or detectable from HTML
page_metadata['language'] = language
# Get dynamic chunks if available
dynamic_chunks = page_metadata.pop('dynamic_chunks', [])
# Split by headers (so we can link chunks to their sections/headers)
html_header_splits = self.html_splitter.split_text(content_html)
docs = []
chunk_id = 0
# Process dynamic content FIRST (prioritize dynamic chunks they sometimes contain more complete info)
for dynamic_chunk in dynamic_chunks:
chunk_metadata = {
**page_metadata,
'section_title': dynamic_chunk['section_title'],
'content_type': dynamic_chunk['content_type'],
'source': 'dynamic',
'chunk_id': chunk_id,
'chunk_index': dynamic_chunk['chunk_index']
}
docs.append({
"text": dynamic_chunk['text'],
"metadata": chunk_metadata
})
chunk_id += 1
# STEP 2: Process static HTML content, checking for token-based duplicates
for section in html_header_splits:
# Get header context
header_context = section.metadata if hasattr(section, 'metadata') else {}
content = section.page_content
content = self._normalize_section_content(content)
section_title = self._normalize_section_title(header_context)
# Create chunk metadata
chunk_metadata = {
**page_metadata,
'headers': header_context,
'section_title': section_title,
'source': 'static',
'chunk_id': chunk_id
}
# Apply content filters
if self._should_skip_content(content, chunk_metadata):
continue
# Split into smaller chunks
section_chunks = self.text_splitter.split_text(content)
for chunk_text in section_chunks:
if not self._is_static_chunk_covered_by_dynamic(chunk_text, docs, coverage_threshold=0.70):
docs.append({
"text": chunk_text,
"metadata": {
**chunk_metadata,
'chunk_id': chunk_id
}
})
chunk_id += 1
return {
"index_timestamp": datetime.now().isoformat(),
"total_chunks": len(docs),
"static_chunks": len([d for d in docs if d['metadata']['source'] == 'static']),
"dynamic_chunks": len([d for d in docs if d['metadata']['source'] == 'dynamic']),
"page_metadata": page_metadata,
"chunks": docs
}
# ========================================================================
# PRIVATE HELPER METHODS
# ========================================================================
def _clean_html(self, soup: BeautifulSoup) -> str:
"""Clean HTML and extract main content using centralized HTMLCleaner"""
return HTMLCleaner.clean_html_for_content_extraction(soup)
def _normalize_section_content(self, content: str) -> str:
"""Normalize content: remove newlines, invisible chars, and clean whitespace"""
return TextNormalizer.normalize_content(content)
def _should_skip_content(self, content: str, metadata: Dict[str, Any]) -> bool:
"""Check if content should be skipped based on filters"""
return any(filter.should_skip(content, metadata) for filter in self.content_filters)
def _calculate_token_coverage(self, static_text: str, dynamic_text: str) -> float:
"""Calculate what percentage of static text tokens are covered by dynamic text"""
return TextNormalizer.calculate_token_coverage(static_text, dynamic_text)
def _is_static_chunk_covered_by_dynamic(self, static_text: str, dynamic_chunks: List[Dict], coverage_threshold: float = 0.7) -> bool:
"""Check if static chunk is sufficiently covered by any dynamic chunk"""
#Note this helps with checking for duplication.
for dynamic_chunk in dynamic_chunks:
coverage = self._calculate_token_coverage(static_text, dynamic_chunk['text'])
if coverage >= coverage_threshold:
return True
return False
def _normalize_section_title(self, header_context: Dict[str, Any]) -> str:
"""Create a normalized section title from header context"""
if not header_context:
return "Main Content"
# Concatenate headers in hierarchical order
headers = []
for level in ['Header 1', 'Header 2', 'Header 3', 'Header 4', 'Header 5', 'Header 6']:
if level in header_context:
headers.append(header_context[level])
if headers:
return " > ".join(headers)
else:
return "Main Content"
def create_parser() -> HTMLParser:
"""Create a configured HTML parser with both static and dynamic content extraction"""
chunk_size = 500
chunk_overlap = 50
parser = HTMLParser(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# Add metadata extractors with consistent chunk configuration
parser.add_metadata_extractor(BasicMetadataExtractor())
parser.add_metadata_extractor(ScriptDataExtractor(chunk_size=chunk_size, chunk_overlap=chunk_overlap))
# Add content filters
parser.add_content_filter(StructuredDataFilter())
parser.add_content_filter(HeaderOnlyFilter())
parser.add_content_filter(MinLengthFilter(min_length=30))
return parser