#!/usr/bin/env python # coding: utf-8 import os import re import uuid import ollama import logging import psycopg2 from psycopg2.extras import Json from datetime import date, datetime from bs4 import BeautifulSoup # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def read_file(filename): content = '' with open(filename, 'r') as f: content = f.read() return content BASE_PATH = './docs' def create_chunk(content_type, content, heading_stack, extra_metadata=None): """Helper to create consistent chunk structure. Generated by ClaudeAI""" content = content if isinstance(content, list): # Join the list first, then replace newlines content = ' '.join(content).replace('\n', ' ') else: # It's already a string content = content.replace('\n', ' ') chunk = { 'content': content, 'content_type': content_type, 'heading_path': ' > '.join(h['text'] for h in heading_stack), 'immediate_heading': heading_stack[-1]['text'] if heading_stack else None, 'headings': [h['text'] for h in heading_stack], } if extra_metadata: chunk.update(extra_metadata) return chunk def process_list(list_element, heading_stack): """Process ul/ol lists as single chunks or individual items""" list_type = 'ordered_list' if list_element.name == 'ol' else 'unordered_list' # Extract all list items items = [] for li in list_element.find_all('li', recursive=False): # Only direct children item_text = li.get_text().strip() if item_text: # Clean up bullets and numbering from the text cleaned_text = clean_list_item_text(item_text) if cleaned_text: # Only add if there's content after cleaning items.append(cleaned_text) if not items: return None # Strategy 1: Treat entire list as one chunk if len(items) <= 10: # Reasonable threshold content = format_list_content(items, list_type) return create_chunk(list_type, content, heading_stack, { 'item_count': len(items), 'list_items': items }) # Strategy 2: Split long lists into multiple chunks else: chunks = [] chunk_size = 8 for i in range(0, len(items), chunk_size): chunk_items = items[i:i + chunk_size] content = format_list_content(chunk_items, list_type) chunk = create_chunk(f'{list_type}_part', content, heading_stack, { 'item_count': len(chunk_items), 'list_items': chunk_items, 'part_number': i // chunk_size + 1, 'total_parts': (len(items) + chunk_size - 1) // chunk_size }) chunks.append(chunk) return chunks def clean_list_item_text(text): """Remove bullets, numbers, and other list markers from text""" # First, split on bullet points if multiple items are concatenated # This handles cases where multiple list items got joined together if '•' in text: # Split on bullets and clean each part parts = text.split('•') cleaned_parts = [] for part in parts: cleaned_part = clean_single_item(part.strip()) if cleaned_part: cleaned_parts.append(cleaned_part) if len(cleaned_parts) > 1: # Multiple items were concatenated, return them separated return ' | '.join(cleaned_parts) else: # Single item, continue with normal cleaning text = parts[0] if parts else text # Clean single item return clean_single_item(text) def clean_single_item(text): """Clean a single list item""" if not text: return "" # Common bullet characters and patterns to remove bullet_patterns = [ r'^[•·▪▫‣⁃◦▸▹►▻○●◉◎⦿⦾]\s*', # Various bullet characters r'^[-–—*+]\s*', # Dash, asterisk, plus bullets r'^\d+[\.\)]\s*', # Numbers with periods or parentheses r'^[a-zA-Z][\.\)]\s*', # Letters with periods or parentheses r'^[ivxlcdm]+[\.\)]\s*', # Roman numerals r'^\([a-zA-Z0-9]+\)\s*', # Parenthesized numbers/letters r'^\s*\u2022\s*', # Unicode bullet r'^\s*\u25E6\s*', # White bullet r'^\s*\u25AA\s*', # Black small square r'^\s*\u25AB\s*', # White small square ] cleaned_text = text for pattern in bullet_patterns: cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE) # Remove extra whitespace cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() return cleaned_text def format_list_content(items, list_type): """Format list items into readable content WITHOUT adding bullets""" if list_type == 'ordered_list': return '\n'.join(f"{i+1}. {item}" for i, item in enumerate(items)) else: # For unordered lists, just join with newlines or separators # Don't add bullets since we want clean text return '\n'.join(items) # Alternative: use a separator instead of newlines # return ' | '.join(items) def process_table(table_element, heading_stack): """Process tables with different strategies based on size. Generated by ClaudeAI""" # Extract table data table_data = extract_table_data(table_element) if not table_data['rows']: return None row_count = len(table_data['rows']) col_count = len(table_data['headers']) if table_data['headers'] else len(table_data['rows'][0]) # Strategy based on table size if row_count <= 20 and col_count <= 6: # Small table: treat as single chunk content = format_table_content(table_data) return create_chunk('table', content, heading_stack, { 'row_count': row_count, 'column_count': col_count, 'headers': table_data['headers'], 'table_caption': table_data['caption'] }) else: # Large table: split by rows return split_large_table(table_data, heading_stack) def extract_table_data(table_element): """Extract structured data from table. Generated by ClaudeAI""" # Get caption if present caption_elem = table_element.find('caption') caption = caption_elem.get_text().strip() if caption_elem else None # Extract headers headers = [] header_row = table_element.find('thead') if header_row: for th in header_row.find_all(['th', 'td']): headers.append(th.get_text().strip()) else: # Try first row as headers first_row = table_element.find('tr') if first_row: for cell in first_row.find_all(['th', 'td']): headers.append(cell.get_text().strip()) # Extract data rows rows = [] tbody = table_element.find('tbody') or table_element for tr in tbody.find_all('tr')[1 if not table_element.find('thead') and headers else 0:]: row = [] for cell in tr.find_all(['td', 'th']): row.append(cell.get_text().strip()) if row: # Skip empty rows rows.append(row) return { 'caption': caption, 'headers': headers, 'rows': rows } def format_table_content(table_data): """Format table data into readable text Generated by ClaudeAI""" content_parts = [] if table_data['caption']: content_parts.append(f"Table: {table_data['caption']}") headers = table_data['headers'] rows = table_data['rows'] if headers: content_parts.append("Columns: " + " | ".join(headers)) # Format rows for i, row in enumerate(rows): if headers and len(row) == len(headers): # Create key-value pairs row_content = [] for header, value in zip(headers, row): if value: # Skip empty cells row_content.append(f"{header}: {value}") if row_content: content_parts.append(f"Row {i+1}: " + "; ".join(row_content)) else: # Simple row format content_parts.append(f"Row {i+1}: " + " | ".join(row)) return '\n'.join(content_parts) def split_large_table(table_data, heading_stack): """Split large tables into smaller chunks Generated By ClaudeAI""" chunks = [] headers = table_data['headers'] rows = table_data['rows'] chunk_size = 10 # Rows per chunk total_chunks = (len(rows) + chunk_size - 1) // chunk_size for i in range(0, len(rows), chunk_size): chunk_rows = rows[i:i + chunk_size] chunk_table_data = { 'caption': table_data['caption'], 'headers': headers, 'rows': chunk_rows } content = format_table_content(chunk_table_data) chunk = create_chunk('table_part', content, heading_stack, { 'row_count': len(chunk_rows), 'column_count': len(headers) if headers else len(chunk_rows[0]), 'headers': headers, 'table_caption': table_data['caption'], 'part_number': i // chunk_size + 1, 'total_parts': total_chunks, 'row_range': f"{i+1}-{min(i+chunk_size, len(rows))}" }) chunks.append(chunk) return chunks def extract_content(html_content): ret_value = {} soup = BeautifulSoup(html_content, 'html.parser') og_url = soup.find('meta', property='og:url') og_description = soup.find('meta', property="og:description") og_title = soup.find('meta', property="og:title") print(og_title) title_content = og_title.get('content') if og_title else None title = re.sub(r'[\s\xa0]*\|[\s\xa0]*', ' | ', title_content) if title_content else None article_body = soup.find('div', class_='devsite-article-body') if not article_body: return {} footer = soup.find('devsite-content-footer') # footer_paras = footer.find_all('p') if footer else None # second_para = footer_paras[1] if len(footer_paras) > 1 else None date_last_modified = date.today().strftime('%Y-%m-%d') if footer: footer_paras = footer.find_all('p') for fp in footer_paras: last_updated_re = r'Last updated (.*) UTC' match = re.search(last_updated_re, fp.get_text()) if match: date_last_modified = match.group(1) break # # Start ClaudeAI generated Code # chunks = [] heading_stack = [] # Process elements that can be chunks or provide context for element in article_body.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'ul', 'ol', 'table']): if element.name.startswith('h'): level = int(element.name[1]) heading_text = element.get_text().strip() heading_stack = [h for h in heading_stack if h['level'] < level] heading_stack.append({'level': level, 'text': heading_text}) elif element.name == 'p': raw_content = element.get_text().strip() if isinstance(raw_content, list): # Join the list first, then replace newlines content = ' '.join(raw_content).replace('\n', ' ') else: # It's already a string content = raw_content.replace('\n', ' ') if content and len(content) > 10: chunk = create_chunk('paragraph', content, heading_stack) chunks.append(chunk) elif element.name in ['ul', 'ol']: list_chunk = process_list(element, heading_stack) if list_chunk: chunks.append(list_chunk) elif element.name == 'table': table_chunk = process_table(element, heading_stack) if table_chunk: chunks.append(table_chunk) # # End ClaudeAI generated code # ret_value['url'] = og_url.get('content') if og_url else None ret_value['description'] = og_description.get('content') if og_description else None ret_value['title'] = title ret_value['date_last_modified'] = date_last_modified ret_value['chunks'] = chunks # ret_value['article'] = article_body return ret_value def prepare_enhanced_documents(html_content, additional_metadata=None): extracted = extract_content(html_content) if not extracted: return [] # extracted = extract_all_content_chunks(html_content) additional_metadata = { "date_last_modified": extracted['date_last_modified'] } documents = [] chunk_counter = 0 for chunk in extracted['chunks']: # Handle cases where list/table processing returns multiple chunks if isinstance(chunk, list): for sub_chunk in chunk: doc = create_document_from_chunk(sub_chunk, extracted['url'], extracted['title'], chunk_counter, additional_metadata) documents.append(doc) chunk_counter += 1 else: doc = create_document_from_chunk(chunk, extracted['url'], extracted['title'], chunk_counter, additional_metadata) documents.append(doc) chunk_counter += 1 return documents def create_document_from_chunk(chunk, url, title, index, additional_metadata): """Create document object from chunk""" # Create enhanced content for embedding content_parts = [] # Add document title content_parts.append(title) # Add heading context if chunk['heading_path']: content_parts.append(f"Section: {chunk['heading_path']}") # Add content type context content_type_labels = { 'paragraph': '', 'unordered_list': 'List:', 'ordered_list': 'Numbered list:', 'table': 'Table:', 'table_part': 'Table data:', 'unordered_list_part': 'List items:', 'ordered_list_part': 'Numbered list items:' } type_label = content_type_labels.get(chunk['content_type'], '') if type_label: content_parts.append(type_label) # Add main content content_parts.append(chunk['content']) # Enhanced content for embedding embedding_content = ' '.join(content_parts) # Base metadata metadata = { 'source_url': url, 'document_title': title, 'chunk_index': index, 'content_type': chunk['content_type'], 'heading_path': chunk['heading_path'], 'immediate_heading': chunk['immediate_heading'], 'all_headings': chunk['headings'], 'processed_at': date.today().strftime('%Y-%m-%d'), **(additional_metadata or {}) } # Add content-specific metadata for key in ['item_count', 'list_items', 'row_count', 'column_count', 'headers', 'table_caption', 'part_number', 'total_parts', 'row_range']: if key in chunk: metadata[key] = chunk[key] doc = { 'id': f"{url}#chunk{index}", 'content': chunk['content'], 'embedding_content': embedding_content, 'metadata': metadata } return doc def create_embedding(text): """Create an embedding vector for a single text""" response = ollama.embeddings( model='nomic-embed-text', prompt=text ) return response['embedding'] # content = read_file('./docs/run/cloud.google.com/run/docs/overview/what-is-cloud-run') folders_to_read = [ 'docs/run/cloud.google.com/run/docs', 'docs/compute/cloud.google.com/compute/docs', 'docs/iam/cloud.google.com/iam/docs' ] def is_html_file(file_path: str) -> bool: """Check if file is likely an HTML file based on content.""" try: with open(file_path, 'r', encoding='utf-8') as file: first_line = file.readline().lower().strip() # Check for common HTML indicators return (first_line.startswith('<!doctype html') or first_line.startswith('<html') or '<html' in first_line) except: return False # def insert_to_database(conn, records): # """Insert records into the vector_store table.""" # try: # cursor = conn.cursor() # insert_query = """ # INSERT INTO vector_store ( # id, title, chunk_index, content, source_url, # date_last_modified, metadata, embedding # ) VALUES ( # %s, %s, %s, %s, %s, %s, %s, %s # ) # """ # for record in records: # cursor.execute(insert_query, ( # record['id'], # record['title'], # record['chunk_index'], # record['content'], # record['source_url'], # record['date_last_modified'], # Json(record['metadata']), # record['embedding'] # )) # conn.commit() # logger.info(f"Inserted {len(records)} records into database") # except Exception as e: # logger.error(f"Database insertion error: {str(e)}") # conn.rollback() # raise # finally: # cursor.close() # In[15]: def insert_to_database(conn, records): """Insert records into the documents and chunks tables.""" try: cursor = conn.cursor() # Group records by source_url to handle documents documents_by_url = {} for record in records: source_url = record['source_url'] if source_url not in documents_by_url: documents_by_url[source_url] = { 'title': record['title'], 'source_url': source_url, 'date_last_modified': record['date_last_modified'], 'metadata': record['metadata'], 'chunks': [] } documents_by_url[source_url]['chunks'].append(record) # Insert documents (with conflict handling for duplicates) document_insert_query = """ INSERT INTO documents (source_url, title, date_last_modified, metadata) VALUES (%s, %s, %s, %s) ON CONFLICT (source_url) DO UPDATE SET title = EXCLUDED.title, date_last_modified = EXCLUDED.date_last_modified, metadata = EXCLUDED.metadata RETURNING id """ # Insert chunks chunk_insert_query = """ INSERT INTO chunks (document_id, chunk_index, content, embedding) VALUES (%s, %s, %s, %s) """ # Get document ID query for existing documents get_document_id_query = """ SELECT id FROM documents WHERE source_url = %s """ total_chunks_inserted = 0 for source_url, doc_data in documents_by_url.items(): # Try to insert the document (or update if exists) try: cursor.execute(document_insert_query, ( doc_data['source_url'], doc_data['title'], doc_data['date_last_modified'], Json(doc_data['metadata']) )) document_id = cursor.fetchone()[0] except Exception as e: # If insert fails, try to get existing document ID cursor.execute(get_document_id_query, (source_url,)) result = cursor.fetchone() if result: document_id = result[0] else: logger.error(f"Failed to insert or find document for URL {source_url}: {str(e)}") continue # Insert chunks for this document for chunk in doc_data['chunks']: cursor.execute(chunk_insert_query, ( document_id, chunk['chunk_index'], chunk['content'], chunk['embedding'] )) total_chunks_inserted += 1 conn.commit() logger.info(f"Inserted {len(documents_by_url)} documents and {total_chunks_inserted} chunks into database") except Exception as e: logger.error(f"Database insertion error: {str(e)}") conn.rollback() raise finally: cursor.close() # In[16]: def process(folder_path): try: # Connect to database conn = psycopg2.connect( host="localhost", # Database server host port=15432, # Port number (default: 5432) database="gcp_docs", # Database name user="admin", # Username password="password" # Password ) logger.info("Connected to database successfully") # Track processing statistics total_files = 0 total_chunks = 0 # Walk through all directories and files for root, dirs, files in os.walk(folder_path): logger.info(f"Processing directory: {root}") for filename in files: file_path = os.path.join(root, filename) html_page = read_file(file_path) logger.info(f"Processing file: {file_path}") # Extract content from HTML document = prepare_enhanced_documents(html_page) if not document: logger.warning(f"No content extracted from {file_path}") continue i = 0 records = [] for chunk in document: embedding = create_embedding(chunk['embedding_content']) record = { 'id': str(uuid.uuid4()), 'title': chunk['metadata']['document_title'], 'chunk_index':chunk['id'], 'content': chunk['content'], 'source_url': chunk['metadata']['source_url'], 'date_last_modified': chunk['metadata']['date_last_modified'], 'metadata': { **chunk['metadata'], 'chunk_number': i, 'total_chunks': len(document), 'chunk_size': len(chunk['embedding_content']) }, 'embedding': embedding } records.append(record) i = i + 1 # Insert records into database if records: insert_to_database(conn, records) total_files += 1 total_chunks += len(records) logger.info(f"Processing complete. Processed {total_files} files, created {total_chunks} chunks") return records except Exception as e: logger.error(f"Error during processing: {str(e)}") raise finally: if 'conn' in locals(): conn.close() logger.info("Database connection closed") # In[17]: # html_file = read_file('docs/run/cloud.google.com/run/docs/monitoring-overview') # prepare_enhanced_documents(html_file) #records = process('docs/compute/cloud.google.com/compute/docs') # records = process('docs/storage/cloud.google.com/storage/docs') # records = process('docs/pubsub/cloud.google.com/pubsub/docs') # records = process('docs/functions/cloud.google.com/functions/docs') # records = process('docs/run/cloud.google.com/run/docs') # records = process('docs/iam/cloud.google.com/iam/docs') # records = process('docs/iap/cloud.google.com/iap/docs') # records = process('docs/bigquery/cloud.google.com/bigquery/docs') # records = process('docs/apigee/cloud.google.com/apigee/docs') #records = process('docs/sql/cloud.google.com/sql/docs') records = process('docs/firestore/cloud.google.com/firestore/docs') records = process('docs/firestore/cloud.google.com/firestore/native')