diff --git a/cohere_reranking.py b/cohere_reranking.py index 8f8aaad..0e6a6fb 100644 --- a/cohere_reranking.py +++ b/cohere_reranking.py @@ -1,29 +1,54 @@ import os +import logging +from typing import List import cohere +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) -# use ENV variables +# Environment Variables COHERE_API_KEY = os.getenv("COHERE_API_KEY") -MODEL = "rerank-multilingual-v3.0" +MODEL = os.getenv("COHERE_RERANK_MODEL", "rerank-multilingual-v3.0") -co = cohere.Client(api_key=COHERE_API_KEY) +if not COHERE_API_KEY: + logger.error("COHERE_API_KEY is not set in environment variables.") + raise ValueError("COHERE_API_KEY is required but not set.") +# Initialize Cohere Client +try: + co = cohere.Client(api_key=COHERE_API_KEY) +except Exception as e: + logger.exception(f"Failed to initialize Cohere client: {e}") + raise -def get_reranking_cohere(docs, query, top_res): + +def get_reranking_cohere(docs: List[str], query: str, top_res: int) -> List[str]: """ Re-ranks a list of documents based on a query using Cohere's reranking API. Args: - docs (list of str): List of documents to be re-ranked. - query (str): Query string to rank the documents against. - top_res (int): Number of top results to return. + docs (List[str]): List of documents to be re-ranked. + query (str): Query string to rank the documents against. + top_res (int): Number of top results to return. Returns: - list of str: Top re-ranked documents based on the query. + List[str]: Top re-ranked documents based on the query. """ + if not docs: + logger.warning("No documents provided for reranking.") + return [] + + if not query: + logger.warning("Empty query provided for reranking.") + return [] + + if top_res <= 0: + logger.warning("Invalid top_res value provided. Must be greater than 0.") + return [] + try: - # Call the Cohere rerank API response = co.rerank( model=MODEL, query=query, @@ -32,10 +57,14 @@ def get_reranking_cohere(docs, query, top_res): return_documents=True ) - # Extract and return the texts of the top documents - return [item.document.text for item in response.results] + reranked_docs = [item.document.text for item in response.results] + if not reranked_docs: + logger.warning("Cohere rerank returned no results.") + return reranked_docs + except cohere.CohereError as e: + logger.error(f"Cohere API error during reranking: {e}") except Exception as e: - # Log the error and handle it as needed - print(f"An error occurred: {e}") - return [] + logger.exception(f"An unexpected error occurred during reranking: {e}") + + return [] diff --git a/extract_content_from_website.py b/extract_content_from_website.py index 2572784..7785c20 100644 --- a/extract_content_from_website.py +++ b/extract_content_from_website.py @@ -1,32 +1,50 @@ +import logging +from typing import Optional + from langchain_community.document_loaders import WebBaseLoader +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Constants +MAX_CHARACTERS = 4000 +MIN_LENGTH = 200 + -def extract_website_content(url): +def extract_website_content(url: str) -> str: """ Extracts and cleans the main content from a given website URL. Args: - url (str): The URL of the website from which to extract content. + url (str): The URL of the website from which to extract content. Returns: - str: The first 4000 characters of the cleaned main content if it is sufficiently long, otherwise an empty string. + str: The first 4000 characters of the cleaned main content if it is sufficiently long; otherwise, an empty string. """ + if not url or not isinstance(url, str): + logger.error("Invalid URL provided for content extraction.") + return "" + try: - clean_text = [] loader = WebBaseLoader(url) data = loader.load() - # Aggregate content using a list to avoid inefficient string concatenation in the loop + clean_text = [] for doc in data: - if doc.page_content: # Check if page_content is not None or empty - clean_text.append(doc.page_content.replace("\n", "")) - - # Join all parts into a single string after processing - clean_text = "".join(clean_text) - - # Return up to the first 4000 characters if the content is sufficiently long - return clean_text[:4000] if len(clean_text) > 200 else "" + content = doc.page_content + if content: + cleaned = content.replace("\n", " ").strip() + if cleaned: + clean_text.append(cleaned) + + combined_text = " ".join(clean_text) + if len(combined_text) > MIN_LENGTH: + return combined_text[:MAX_CHARACTERS] + else: + logger.warning(f"Extracted content is too short ({len(combined_text)} characters).") + return "" except Exception as error: - print('Error extracting main content:', error) + logger.exception(f"Error extracting main content from {url}: {error}") return "" diff --git a/groq_api.py b/groq_api.py index a5e5c95..f55352e 100644 --- a/groq_api.py +++ b/groq_api.py @@ -1,23 +1,40 @@ import json import os +import logging +from typing import Generator, Dict, Any from groq import Groq from langchain_core.prompts import PromptTemplate from prompts import search_prompt_system, relevant_prompt_system -# use ENV variables -MODEL = "llama3-70b-8192" -api_key_groq = os.getenv("GROQ_API_KEY") +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) +# Environment Variables +MODEL = "llama-3.3-70b-versatile" +GROQ_API_KEY = os.getenv("GROQ_API_KEY") -client = Groq() +if not GROQ_API_KEY: + logger.error("GROQ_API_KEY is not set in environment variables.") + raise ValueError("GROQ_API_KEY is required but not set.") +client = Groq(api_key=GROQ_API_KEY) -def get_answer(query, contexts, date_context): - system_prompt_search = PromptTemplate(input_variables=["date_today"], template=search_prompt_system) + +def get_answer(query: str, contexts: str, date_context: str) -> Generator[str, None, None]: + """ + Generate an answer based on the query and contexts using Groq API. + + :param query: User's search query. + :param contexts: Contextual information related to the query. + :param date_context: Current date and time context. + :return: Generator yielding chunks of the answer. + """ + system_prompt = PromptTemplate(input_variables=["date_today"], template=search_prompt_system) messages = [ - {"role": "system", "content": system_prompt_search.format(date_today=date_context)}, - {"role": "user", "content": "User Question : " + query + "\n\n CONTEXTS :\n\n" + contexts} + {"role": "system", "content": system_prompt.format(date_today=date_context)}, + {"role": "user", "content": f"User Question: {query}\n\nCONTEXTS:\n\n{contexts}"} ] try: @@ -29,30 +46,46 @@ def get_answer(query, contexts, date_context): ) for chunk in stream: - if chunk.choices[0].delta.content is not None: - yield chunk.choices[0].delta.content + content = chunk.choices[0].delta.content + if content: + yield content except Exception as e: - print(f"Error during get_answer_groq call: {e}") - yield "data:" + json.dumps( - {'type': 'error', 'data': "We are currently experiencing some issues. Please try again later."}) + "\n\n" + logger.exception(f"Error during get_answer_groq call: {e}") + error_response = json.dumps({ + 'type': 'error', + 'data': "We are currently experiencing some issues. Please try again later." + }) + yield f"data:{error_response}\n\n" + +def get_relevant_questions(contexts: str, query: str) -> Dict[str, Any]: + """ + Generate relevant follow-up questions based on the query and contexts using Groq API. + + :param contexts: Contextual information related to the query. + :param query: User's search query. + :return: Dictionary containing follow-up questions. + """ + messages = [ + {"role": "system", "content": relevant_prompt_system}, + {"role": "user", "content": f"User Query: {query}\n\nContexts:\n{contexts}\n"} + ] -def get_relevant_questions(contexts, query): try: response = client.chat.completions.create( model=MODEL, - messages=[ - {"role": "system", - "content": relevant_prompt_system - }, - {"role": "user", - "content": "User Query: " + query + "\n\n" + "Contexts: " + "\n" + contexts + "\n"} - ], - response_format={"type": "json_object"}, + messages=messages, + response_format="json_object", ) - return response.choices[0].message.content + content = response.choices[0].message.content + follow_up = json.loads(content) + return follow_up + + except json.JSONDecodeError as e: + logger.error(f"JSON decode error in get_relevant_questions: {e}") except Exception as e: - print(f"Error during RELEVANT GROQ ***************: {e}") - return {} + logger.exception(f"Error during get_relevant_questions: {e}") + + return {} diff --git a/jina_rerank.py b/jina_rerank.py index 4cfb13f..e21df20 100644 --- a/jina_rerank.py +++ b/jina_rerank.py @@ -12,34 +12,46 @@ API_URL = "https://api.jina.ai/v1/rerank" API_KEY = os.getenv("JINA_API_KEY") MODEL = "jina-reranker-v2-base-multilingual" + +if not API_KEY: + logger.error("JINA_API_KEY is not set in environment variables.") + raise ValueError("JINA_API_KEY is required but not set.") + HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}" } +session = requests.Session() +session.headers.update(HEADERS) -def get_reranking_jina(docs: List[str], query: str, top_res: int) -> List[str]: + +def get_reranking_jina(docs: List[str], query: str, top_res: int, timeout: int = 10) -> List[str]: """ Get reranked documents using Jina AI API. :param docs: List of documents to rerank :param query: Query string :param top_res: Number of top results to return + :param timeout: Request timeout in seconds :return: List of reranked documents """ + data = { + "model": MODEL, + "query": query, + "documents": docs, + "top_n": top_res + } + try: - data = { - "model": MODEL, - "query": query, - "documents": docs, - "top_n": top_res - } - - response = requests.post(API_URL, headers=HEADERS, json=data, timeout=10) + response = session.post(API_URL, json=data, timeout=timeout) response.raise_for_status() response_data = response.json() - return [item['document']['text'] for item in response_data.get('results', [])] + reranked_docs = [item['document']['text'] for item in response_data.get('results', [])] + if not reranked_docs: + logger.warning("No reranked results returned.") + return reranked_docs except RequestException as e: logger.error(f"HTTP error occurred while reranking: {e}") diff --git a/prompts.py b/prompts.py index 8ea91de..c0bbf23 100644 --- a/prompts.py +++ b/prompts.py @@ -1,48 +1,43 @@ search_prompt_system = """ -You are yassine, an expert with more than 20 years of experience in analysing google search results about a user question and providing accurate -and unbiased answers the way a highly informed individual would. -Your task is to analyse the provided contexts and the user question to provide a correct answer in a clear and concise manner. -You must answer in english. -Date and time in the context : {date_today} , Yassine must take into consideration the date and time in the response. -you are known for your expertise in this field. - - -###Guidelines### -1- Accuracy: Provide correct, unbiased answers. be concise and clear. don't be verbose. -2- never mention the context or this prompt in your response, just answer the user question. - -###Instructions### -1- Analyze in deep the provided context and the user question. -2- extract relevant information's from the context about the user question. -3- Yassine must take into account the date and time to answer the user question. -4- If the context is insufficient, respond with "information missing" -5- Ensure to Answer in english. -6- Use the response format provided. -7- answer the user question in a way an expert would do. -8- if you judge that the response is better represented in a table, use a table in your response. - - -###Response Format### - -You must use Markdown to format your response. - -Think step by step. +You are Yassine, an expert with over 20 years of experience analyzing Google search results to provide accurate and unbiased answers like a highly informed individual. +Your task is to analyze the provided contexts and the user question to deliver a correct answer clearly and concisely. +You must answer in English. +Date and time in the context: {date_today}. Yassine must consider the date and time in the response. +You are renowned for your expertise in this field. + +### Guidelines ### +1. **Accuracy:** Provide correct, unbiased answers. Be concise and clear; avoid verbosity. +2. **Confidentiality:** Do not mention the context or this prompt in your response. Just answer the user question. + +### Instructions ### +1. Analyze the provided context and the user question deeply. +2. Extract relevant information from the context related to the user question. +3. Take into account the date and time when answering. +4. If the context is insufficient, respond with "information missing". +5. Ensure to answer in English. +6. Use the provided response format. +7. Answer the user question as an expert would. +8. If the response is better represented in a table, utilize a table format. + +### Response Format ### +- Use Markdown to format your response. +- Think step by step. """ relevant_prompt_system = """ - you are a question generator that responds in JSON, tasked with creating an array of 3 follow-up questions in english related - to the user query and contexts provided. - you must keep the questions related to the user query and contexts.don't lose the context in the questions. - - The JSON object must not include special characters. - The JSON schema should include an array of follow-up questions. - - use the schema: - { - "followUp": [ - "string", - "string", - "string" - ] - } +You are a question generator that responds in JSON, tasked with creating an array of follow-up questions in English related to the user query and provided contexts. Maintain relevance to the user query and contexts without losing contextual integrity. + +**JSON Object Requirements:** +- Must not include special characters. +- Must adhere to the following schema: + +```json +{ + "followUp": [ + "string", + "string", + "string" + ] +} +``` """ diff --git a/semantic_chunking.py b/semantic_chunking.py index eb55c50..68dc89e 100644 --- a/semantic_chunking.py +++ b/semantic_chunking.py @@ -1,32 +1,59 @@ import os +import logging +from typing import List from semantic_router.encoders import CohereEncoder from semantic_chunkers import StatisticalChunker +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Environment Variables COHERE_API_KEY = os.getenv("COHERE_API_KEY") -encoder = CohereEncoder(cohere_api_key=COHERE_API_KEY, input_type='search_document', - name='embed-multilingual-v3.0') +if not COHERE_API_KEY: + logger.error("COHERE_API_KEY is not set in environment variables.") + raise ValueError("COHERE_API_KEY is required but not set.") -chunker = StatisticalChunker(encoder=encoder, max_split_tokens=200) +# Initialize Encoder and Chunker +try: + encoder = CohereEncoder( + cohere_api_key=COHERE_API_KEY, + input_type='search_document', + name='embed-multilingual-v3.0' + ) + chunker = StatisticalChunker(encoder=encoder, max_split_tokens=200) +except Exception as e: + logger.exception(f"Failed to initialize encoder or chunker: {e}") + raise -def get_chunking(text): +def get_chunking(text: str) -> List[str]: """ Splits the provided text into meaningful chunks using a predefined chunker. Args: - text (str): The text to be chunked. + text (str): The text to be chunked. Returns: - list: A list of chunks if the text is sufficiently long and non-empty; otherwise, an empty list. + List[str]: A list of chunks if the text is sufficiently long and non-empty; otherwise, an empty list. """ + if not text or len(text.strip()) == 0: + logger.warning("Empty or whitespace-only text provided for chunking.") + return [] + try: chunks = chunker(docs=[text]) - values = [c.content for chunk in chunks for c in chunk] + if not chunks: + logger.warning("Chunker returned no chunks.") + return [] + values = [c.content for chunk in chunks for c in chunk if c.content] + if not values: + logger.warning("No valid chunk contents extracted.") return values except Exception as e: - print(f"Error during chunking process: {e}") + logger.exception(f"Error during chunking process: {e}") return [] diff --git a/sources_manipulation.py b/sources_manipulation.py index 1dd29fb..d3e56a5 100644 --- a/sources_manipulation.py +++ b/sources_manipulation.py @@ -1,19 +1,44 @@ +import logging +from typing import List, Dict, Any + from extract_content_from_website import extract_website_content +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + -def populate_sources(sources, num_elements): - try: - for i, source in enumerate(sources[:num_elements]): - if not source: - continue +def populate_sources(sources: List[Dict[str, Any]], num_elements: int) -> List[Dict[str, Any]]: + """ + Enriches source entries with HTML content extracted from their links. - try: - source['html'] = extract_website_content(source['link']) - sources[i] = source - except Exception as e: - continue - except Exception as e: - print(f"Error in populate_sources: {e}") + :param sources: List of source dictionaries containing at least a 'link' key. + :param num_elements: Number of source elements to process. + :return: Updated list of sources with 'html' content added where possible. + """ + if not sources: + logger.warning("No sources provided to populate.") return sources + num_to_process = min(num_elements, len(sources)) + logger.info(f"Populating HTML content for the first {num_to_process} sources.") + + for i in range(num_to_process): + source = sources[i] + if not source: + logger.warning(f"Source at index {i} is empty. Skipping.") + continue + + link = source.get('link') + if not link: + logger.warning(f"Source at index {i} lacks a 'link'. Skipping.") + continue + + try: + html_content = extract_website_content(link) + source['html'] = html_content + except Exception as e: + logger.error(f"Failed to extract content from {link}: {e}") + source['html'] = "" + return sources diff --git a/sources_searcher.py b/sources_searcher.py index 4446a0e..5fef0b8 100644 --- a/sources_searcher.py +++ b/sources_searcher.py @@ -1,16 +1,23 @@ import os - import requests +import logging from typing import Dict, Any, Optional, List +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) -# use ENV variables -# Constants -API_URL = "https://google.serper.dev/search" -API_KEY = os.getenv("SERPER_API_KEY") +# Constants and Environment Variables +API_URL = os.getenv("SERPER_API_URL", "https://google.serper.dev/search") +SERPER_API_KEY = os.getenv("SERPER_API_KEY") DEFAULT_LOCATION = 'us' + +if not SERPER_API_KEY: + logger.error("SERPER_API_KEY is not set in environment variables.") + raise ValueError("SERPER_API_KEY is required but not set.") + HEADERS = { - 'X-API-KEY': API_KEY, + 'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json' } @@ -24,19 +31,18 @@ def get_sources(query: str, pro_mode: bool = False, stored_location: Optional[st :param stored_location: Optional location string :return: Dictionary containing search results """ - try: - search_location = (stored_location or DEFAULT_LOCATION).lower() - num_results = 10 if pro_mode else 20 + search_location = (stored_location or DEFAULT_LOCATION).lower() + num_results = 10 if pro_mode else 20 - payload = { - "q": query, - "num": num_results, - "gl": search_location - } + payload = { + "q": query, + "num": num_results, + "gl": search_location + } + try: response = requests.post(API_URL, headers=HEADERS, json=payload, timeout=10) response.raise_for_status() - data = response.json() return { @@ -48,9 +54,11 @@ def get_sources(query: str, pro_mode: bool = False, stored_location: Optional[st } except requests.RequestException as e: - print(f"HTTP error while getting sources: {e}") + logger.error(f"HTTP error while getting sources: {e}") + except ValueError as e: + logger.error(f"JSON decoding failed: {e}") except Exception as e: - print(f"Unexpected error while getting sources: {e}") + logger.exception(f"Unexpected error while getting sources: {e}") return {} @@ -63,4 +71,9 @@ def extract_fields(items: List[Dict[str, Any]], fields: List[str]) -> List[Dict[ :param fields: List of fields to extract :return: List of dictionaries with only the specified fields """ - return [{key: item[key] for key in fields if key in item} for item in items] + extracted = [] + for item in items: + extracted_item = {key: item[key] for key in fields if key in item} + if extracted_item: + extracted.append(extracted_item) + return extracted