Newer
Older
gcp_docs / rag_pipeline.py
@clewis clewis 12 days ago 23 KB initial commit
#!/usr/bin/env python
# coding: utf-8

import os
import re
import uuid
import ollama
import logging
import psycopg2
from psycopg2.extras import Json
from datetime import date, datetime
from bs4 import BeautifulSoup

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def read_file(filename):
    content = ''
    with open(filename, 'r') as f:
        content = f.read()

    return content

BASE_PATH = './docs'

def create_chunk(content_type, content, heading_stack, extra_metadata=None):
    """Helper to create consistent chunk structure.
    Generated by ClaudeAI"""

    content = content

    if isinstance(content, list):
        # Join the list first, then replace newlines
        content = ' '.join(content).replace('\n', ' ')
    else:
        # It's already a string
        content = content.replace('\n', ' ')

    chunk = {
        'content': content,
        'content_type': content_type,
        'heading_path': ' > '.join(h['text'] for h in heading_stack),
        'immediate_heading': heading_stack[-1]['text'] if heading_stack else None,
        'headings': [h['text'] for h in heading_stack],
    }

    if extra_metadata:
        chunk.update(extra_metadata)

    return chunk

def process_list(list_element, heading_stack):
    """Process ul/ol lists as single chunks or individual items"""
    list_type = 'ordered_list' if list_element.name == 'ol' else 'unordered_list'

    # Extract all list items
    items = []
    for li in list_element.find_all('li', recursive=False):  # Only direct children
        item_text = li.get_text().strip()
        if item_text:
            # Clean up bullets and numbering from the text
            cleaned_text = clean_list_item_text(item_text)
            if cleaned_text:  # Only add if there's content after cleaning
                items.append(cleaned_text)

    if not items:
        return None

    # Strategy 1: Treat entire list as one chunk
    if len(items) <= 10:  # Reasonable threshold
        content = format_list_content(items, list_type)
        return create_chunk(list_type, content, heading_stack, {
            'item_count': len(items),
            'list_items': items
        })

    # Strategy 2: Split long lists into multiple chunks
    else:
        chunks = []
        chunk_size = 8
        for i in range(0, len(items), chunk_size):
            chunk_items = items[i:i + chunk_size]
            content = format_list_content(chunk_items, list_type)
            chunk = create_chunk(f'{list_type}_part', content, heading_stack, {
                'item_count': len(chunk_items),
                'list_items': chunk_items,
                'part_number': i // chunk_size + 1,
                'total_parts': (len(items) + chunk_size - 1) // chunk_size
            })
            chunks.append(chunk)
        return chunks

def clean_list_item_text(text):
    """Remove bullets, numbers, and other list markers from text"""

    # First, split on bullet points if multiple items are concatenated
    # This handles cases where multiple list items got joined together
    if '•' in text:
        # Split on bullets and clean each part
        parts = text.split('•')
        cleaned_parts = []
        for part in parts:
            cleaned_part = clean_single_item(part.strip())
            if cleaned_part:
                cleaned_parts.append(cleaned_part)

        if len(cleaned_parts) > 1:
            # Multiple items were concatenated, return them separated
            return ' | '.join(cleaned_parts)
        else:
            # Single item, continue with normal cleaning
            text = parts[0] if parts else text

    # Clean single item
    return clean_single_item(text)

def clean_single_item(text):
    """Clean a single list item"""
    if not text:
        return ""

    # Common bullet characters and patterns to remove
    bullet_patterns = [
        r'^[•·▪▫‣⁃◦▸▹►▻○●◉◎⦿⦾]\s*',  # Various bullet characters
        r'^[-–—*+]\s*',                    # Dash, asterisk, plus bullets
        r'^\d+[\.\)]\s*',                  # Numbers with periods or parentheses
        r'^[a-zA-Z][\.\)]\s*',             # Letters with periods or parentheses
        r'^[ivxlcdm]+[\.\)]\s*',           # Roman numerals
        r'^\([a-zA-Z0-9]+\)\s*',           # Parenthesized numbers/letters
        r'^\s*\u2022\s*',                  # Unicode bullet
        r'^\s*\u25E6\s*',                  # White bullet
        r'^\s*\u25AA\s*',                  # Black small square
        r'^\s*\u25AB\s*',                  # White small square
    ]

    cleaned_text = text
    for pattern in bullet_patterns:
        cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE)

    # Remove extra whitespace
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()

    return cleaned_text

def format_list_content(items, list_type):
    """Format list items into readable content WITHOUT adding bullets"""
    if list_type == 'ordered_list':
        return '\n'.join(f"{i+1}. {item}" for i, item in enumerate(items))
    else:
        # For unordered lists, just join with newlines or separators
        # Don't add bullets since we want clean text
        return '\n'.join(items)
        # Alternative: use a separator instead of newlines
        # return ' | '.join(items)

def process_table(table_element, heading_stack):
    """Process tables with different strategies based on size.
    Generated by ClaudeAI"""

    # Extract table data
    table_data = extract_table_data(table_element)

    if not table_data['rows']:
        return None

    row_count = len(table_data['rows'])
    col_count = len(table_data['headers']) if table_data['headers'] else len(table_data['rows'][0])

    # Strategy based on table size
    if row_count <= 20 and col_count <= 6:
        # Small table: treat as single chunk
        content = format_table_content(table_data)
        return create_chunk('table', content, heading_stack, {
            'row_count': row_count,
            'column_count': col_count,
            'headers': table_data['headers'],
            'table_caption': table_data['caption']
        })

    else:
        # Large table: split by rows
        return split_large_table(table_data, heading_stack)

def extract_table_data(table_element):
    """Extract structured data from table.
    Generated by ClaudeAI"""

    # Get caption if present
    caption_elem = table_element.find('caption')
    caption = caption_elem.get_text().strip() if caption_elem else None

    # Extract headers
    headers = []
    header_row = table_element.find('thead')
    if header_row:
        for th in header_row.find_all(['th', 'td']):
            headers.append(th.get_text().strip())
    else:
        # Try first row as headers
        first_row = table_element.find('tr')
        if first_row:
            for cell in first_row.find_all(['th', 'td']):
                headers.append(cell.get_text().strip())

    # Extract data rows
    rows = []
    tbody = table_element.find('tbody') or table_element

    for tr in tbody.find_all('tr')[1 if not table_element.find('thead') and headers else 0:]:
        row = []
        for cell in tr.find_all(['td', 'th']):
            row.append(cell.get_text().strip())
        if row:  # Skip empty rows
            rows.append(row)

    return {
        'caption': caption,
        'headers': headers,
        'rows': rows
    }

def format_table_content(table_data):
    """Format table data into readable text
    Generated by ClaudeAI"""
    content_parts = []

    if table_data['caption']:
        content_parts.append(f"Table: {table_data['caption']}")

    headers = table_data['headers']
    rows = table_data['rows']

    if headers:
        content_parts.append("Columns: " + " | ".join(headers))

    # Format rows
    for i, row in enumerate(rows):
        if headers and len(row) == len(headers):
            # Create key-value pairs
            row_content = []
            for header, value in zip(headers, row):
                if value:  # Skip empty cells
                    row_content.append(f"{header}: {value}")
            if row_content:
                content_parts.append(f"Row {i+1}: " + "; ".join(row_content))
        else:
            # Simple row format
            content_parts.append(f"Row {i+1}: " + " | ".join(row))

    return '\n'.join(content_parts)

def split_large_table(table_data, heading_stack):
    """Split large tables into smaller chunks
    Generated By ClaudeAI"""
    chunks = []
    headers = table_data['headers']
    rows = table_data['rows']

    chunk_size = 10  # Rows per chunk
    total_chunks = (len(rows) + chunk_size - 1) // chunk_size

    for i in range(0, len(rows), chunk_size):
        chunk_rows = rows[i:i + chunk_size]

        chunk_table_data = {
            'caption': table_data['caption'],
            'headers': headers,
            'rows': chunk_rows
        }

        content = format_table_content(chunk_table_data)

        chunk = create_chunk('table_part', content, heading_stack, {
            'row_count': len(chunk_rows),
            'column_count': len(headers) if headers else len(chunk_rows[0]),
            'headers': headers,
            'table_caption': table_data['caption'],
            'part_number': i // chunk_size + 1,
            'total_parts': total_chunks,
            'row_range': f"{i+1}-{min(i+chunk_size, len(rows))}"
        })

        chunks.append(chunk)

    return chunks


def extract_content(html_content):
    ret_value = {}
    soup = BeautifulSoup(html_content, 'html.parser')

    og_url = soup.find('meta', property='og:url')
    og_description = soup.find('meta', property="og:description")
    og_title = soup.find('meta', property="og:title")
    print(og_title)
    title_content = og_title.get('content') if og_title else None
    title = re.sub(r'[\s\xa0]*\|[\s\xa0]*', ' | ', title_content) if title_content else None

    article_body = soup.find('div', class_='devsite-article-body')
    if not article_body:
        return {}

    footer = soup.find('devsite-content-footer')
    # footer_paras = footer.find_all('p') if footer else None
    # second_para = footer_paras[1] if len(footer_paras) > 1 else None
    date_last_modified = date.today().strftime('%Y-%m-%d')
    if footer:
            footer_paras = footer.find_all('p')
            for fp in footer_paras:
                last_updated_re = r'Last updated (.*) UTC'
                match = re.search(last_updated_re, fp.get_text())
                if match:
                    date_last_modified = match.group(1)
                    break

    #
    # Start ClaudeAI generated Code
    #
    chunks = []
    heading_stack = []

    # Process elements that can be chunks or provide context
    for element in article_body.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'ul', 'ol', 'table']):
        if element.name.startswith('h'):
            level = int(element.name[1])
            heading_text = element.get_text().strip()

            heading_stack = [h for h in heading_stack if h['level'] < level]
            heading_stack.append({'level': level, 'text': heading_text})

        elif element.name == 'p':
            raw_content = element.get_text().strip()

            if isinstance(raw_content, list):
                # Join the list first, then replace newlines
                content = ' '.join(raw_content).replace('\n', ' ')
            else:
                # It's already a string
                content = raw_content.replace('\n', ' ')

            if content and len(content) > 10:
                chunk = create_chunk('paragraph', content, heading_stack)
                chunks.append(chunk)

        elif element.name in ['ul', 'ol']:
            list_chunk = process_list(element, heading_stack)
            if list_chunk:
                chunks.append(list_chunk)

        elif element.name == 'table':
            table_chunk = process_table(element, heading_stack)
            if table_chunk:
                chunks.append(table_chunk)
    #
    # End ClaudeAI generated code
    #

    ret_value['url'] = og_url.get('content') if og_url else None
    ret_value['description'] = og_description.get('content') if og_description else None
    ret_value['title'] = title
    ret_value['date_last_modified'] = date_last_modified
    ret_value['chunks'] = chunks
    # ret_value['article'] = article_body

    return ret_value

def prepare_enhanced_documents(html_content, additional_metadata=None):
    extracted = extract_content(html_content)

    if not extracted:
        return []
    # extracted = extract_all_content_chunks(html_content)

    additional_metadata = {
        "date_last_modified": extracted['date_last_modified']
    }

    documents = []
    chunk_counter = 0

    for chunk in extracted['chunks']:
        # Handle cases where list/table processing returns multiple chunks
        if isinstance(chunk, list):
            for sub_chunk in chunk:
                doc = create_document_from_chunk(sub_chunk, extracted['url'], extracted['title'], chunk_counter, additional_metadata)
                documents.append(doc)
                chunk_counter += 1
        else:
            doc = create_document_from_chunk(chunk, extracted['url'], extracted['title'], chunk_counter, additional_metadata)
            documents.append(doc)
            chunk_counter += 1

    return documents

def create_document_from_chunk(chunk, url, title, index, additional_metadata):
    """Create document object from chunk"""

    # Create enhanced content for embedding
    content_parts = []

    # Add document title
    content_parts.append(title)

    # Add heading context
    if chunk['heading_path']:
        content_parts.append(f"Section: {chunk['heading_path']}")

    # Add content type context
    content_type_labels = {
        'paragraph': '',
        'unordered_list': 'List:',
        'ordered_list': 'Numbered list:',
        'table': 'Table:',
        'table_part': 'Table data:',
        'unordered_list_part': 'List items:',
        'ordered_list_part': 'Numbered list items:'
    }

    type_label = content_type_labels.get(chunk['content_type'], '')
    if type_label:
        content_parts.append(type_label)

    # Add main content
    content_parts.append(chunk['content'])

    # Enhanced content for embedding
    embedding_content = ' '.join(content_parts)

    # Base metadata
    metadata = {
        'source_url': url,
        'document_title': title,
        'chunk_index': index,
        'content_type': chunk['content_type'],
        'heading_path': chunk['heading_path'],
        'immediate_heading': chunk['immediate_heading'],
        'all_headings': chunk['headings'],
        'processed_at': date.today().strftime('%Y-%m-%d'),
        **(additional_metadata or {})
    }

    # Add content-specific metadata
    for key in ['item_count', 'list_items', 'row_count', 'column_count', 'headers', 'table_caption', 'part_number', 'total_parts', 'row_range']:
        if key in chunk:
            metadata[key] = chunk[key]

    doc = {
        'id': f"{url}#chunk{index}",
        'content': chunk['content'],
        'embedding_content': embedding_content,
        'metadata': metadata
    }

    return doc

def create_embedding(text):
    """Create an embedding vector for a single text"""
    response = ollama.embeddings(
        model='nomic-embed-text',
        prompt=text
    )
    return response['embedding']

# content = read_file('./docs/run/cloud.google.com/run/docs/overview/what-is-cloud-run')

folders_to_read = [
    'docs/run/cloud.google.com/run/docs',
    'docs/compute/cloud.google.com/compute/docs',
    'docs/iam/cloud.google.com/iam/docs'
]

def is_html_file(file_path: str) -> bool:
    """Check if file is likely an HTML file based on content."""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            first_line = file.readline().lower().strip()
            # Check for common HTML indicators
            return (first_line.startswith('<!doctype html') or 
                   first_line.startswith('<html') or 
                   '<html' in first_line)
    except:
        return False

# def insert_to_database(conn, records):
#     """Insert records into the vector_store table."""
#     try:
#         cursor = conn.cursor()

#         insert_query = """
#             INSERT INTO vector_store (
#                 id, title, chunk_index, content, source_url, 
#                 date_last_modified, metadata, embedding
#             ) VALUES (
#                 %s, %s, %s, %s, %s, %s, %s, %s
#             )
#         """

#         for record in records:
#             cursor.execute(insert_query, (
#                 record['id'],
#                 record['title'],
#                 record['chunk_index'],
#                 record['content'],
#                 record['source_url'],
#                 record['date_last_modified'],
#                 Json(record['metadata']),
#                 record['embedding']
#             ))

#         conn.commit()
#         logger.info(f"Inserted {len(records)} records into database")

#     except Exception as e:
#         logger.error(f"Database insertion error: {str(e)}")
#         conn.rollback()
#         raise
#     finally:
#         cursor.close()


# In[15]:


def insert_to_database(conn, records):
    """Insert records into the documents and chunks tables."""
    try:
        cursor = conn.cursor()

        # Group records by source_url to handle documents
        documents_by_url = {}
        for record in records:
            source_url = record['source_url']
            if source_url not in documents_by_url:
                documents_by_url[source_url] = {
                    'title': record['title'],
                    'source_url': source_url,
                    'date_last_modified': record['date_last_modified'],
                    'metadata': record['metadata'],
                    'chunks': []
                }
            documents_by_url[source_url]['chunks'].append(record)

        # Insert documents (with conflict handling for duplicates)
        document_insert_query = """
            INSERT INTO documents (source_url, title, date_last_modified, metadata)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (source_url) DO UPDATE SET
                title = EXCLUDED.title,
                date_last_modified = EXCLUDED.date_last_modified,
                metadata = EXCLUDED.metadata
            RETURNING id
        """

        # Insert chunks
        chunk_insert_query = """
            INSERT INTO chunks (document_id, chunk_index, content, embedding)
            VALUES (%s, %s, %s, %s)
        """

        # Get document ID query for existing documents
        get_document_id_query = """
            SELECT id FROM documents WHERE source_url = %s
        """

        total_chunks_inserted = 0

        for source_url, doc_data in documents_by_url.items():
            # Try to insert the document (or update if exists)
            try:
                cursor.execute(document_insert_query, (
                    doc_data['source_url'],
                    doc_data['title'],
                    doc_data['date_last_modified'],
                    Json(doc_data['metadata'])
                ))
                document_id = cursor.fetchone()[0]
            except Exception as e:
                # If insert fails, try to get existing document ID
                cursor.execute(get_document_id_query, (source_url,))
                result = cursor.fetchone()
                if result:
                    document_id = result[0]
                else:
                    logger.error(f"Failed to insert or find document for URL {source_url}: {str(e)}")
                    continue

            # Insert chunks for this document
            for chunk in doc_data['chunks']:
                cursor.execute(chunk_insert_query, (
                    document_id,
                    chunk['chunk_index'],
                    chunk['content'],
                    chunk['embedding']
                ))
                total_chunks_inserted += 1

        conn.commit()
        logger.info(f"Inserted {len(documents_by_url)} documents and {total_chunks_inserted} chunks into database")

    except Exception as e:
        logger.error(f"Database insertion error: {str(e)}")
        conn.rollback()
        raise
    finally:
        cursor.close()


# In[16]:


def process(folder_path):

    try:
        # Connect to database
        conn = psycopg2.connect(
            host="localhost",           # Database server host
            port=15432,                 # Port number (default: 5432)
            database="gcp_docs",           # Database name
            user="admin",           # Username
            password="password"        # Password
        )
        logger.info("Connected to database successfully")

        # Track processing statistics
        total_files = 0
        total_chunks = 0

        # Walk through all directories and files
        for root, dirs, files in os.walk(folder_path):
            logger.info(f"Processing directory: {root}")
            for filename in files:
                file_path = os.path.join(root, filename)

                html_page = read_file(file_path)
                logger.info(f"Processing file: {file_path}")

                # Extract content from HTML
                document = prepare_enhanced_documents(html_page)

                if not document:
                    logger.warning(f"No content extracted from {file_path}")
                    continue

                i = 0
                records = []
                for chunk in document:
                    embedding = create_embedding(chunk['embedding_content'])
                    record = {
                        'id': str(uuid.uuid4()),
                        'title': chunk['metadata']['document_title'],
                        'chunk_index':chunk['id'],
                        'content': chunk['content'],
                        'source_url': chunk['metadata']['source_url'],
                        'date_last_modified': chunk['metadata']['date_last_modified'],
                        'metadata': {
                            **chunk['metadata'],
                            'chunk_number': i,
                            'total_chunks': len(document),
                            'chunk_size': len(chunk['embedding_content'])
                        },
                        'embedding': embedding
                    }
                    records.append(record)
                    i = i + 1

                # Insert records into database
                if records:
                    insert_to_database(conn, records)
                    total_files += 1
                    total_chunks += len(records)

        logger.info(f"Processing complete. Processed {total_files} files, created {total_chunks} chunks")
        return records

    except Exception as e:
        logger.error(f"Error during processing: {str(e)}")
        raise
    finally:
        if 'conn' in locals():
            conn.close()
        logger.info("Database connection closed")


# In[17]:


# html_file = read_file('docs/run/cloud.google.com/run/docs/monitoring-overview')
# prepare_enhanced_documents(html_file)


#records = process('docs/compute/cloud.google.com/compute/docs')
# records = process('docs/storage/cloud.google.com/storage/docs')
# records = process('docs/pubsub/cloud.google.com/pubsub/docs')
# records = process('docs/functions/cloud.google.com/functions/docs')
# records = process('docs/run/cloud.google.com/run/docs')
# records = process('docs/iam/cloud.google.com/iam/docs')
# records = process('docs/iap/cloud.google.com/iap/docs')
# records = process('docs/bigquery/cloud.google.com/bigquery/docs')
# records = process('docs/apigee/cloud.google.com/apigee/docs')
#records = process('docs/sql/cloud.google.com/sql/docs')
records = process('docs/firestore/cloud.google.com/firestore/docs')
records = process('docs/firestore/cloud.google.com/firestore/native')