from fastapi import APIRouter, HTTPException, UploadFile, File, Form, BackgroundTasks from fastapi import Query from fastapi.responses import FileResponse,JSONResponse from pydantic import BaseModel from typing import Any, List, Dict,Literal from ..Chatbot.chroma_cms import ChromaHandler from ..models.models import Chunk, TaskStatus,PromptsRequest, DeleteFileByIDRequest, UpdateChunkRequest, SimilaritySearchRequest, Document, FileMetadata ,UpdateS3Request from dotenv import load_dotenv import json import uuid from ..utils.utils import download_from_s3 ,update_s3_object , upload_to_s3,generate_presigned_url, generate_multiple_presigned_urls from src.utils.logger import logger from datetime import datetime, timedelta import os import traceback import csv # Ensure this import is at the top of your file from urllib.parse import urlparse load_dotenv() base_dir = os.getcwd() temp_folder = os.path.join(base_dir, "temp") if not os.path.exists(temp_folder): os.makedirs(temp_folder, exist_ok=True) cms_router = APIRouter() # Initialize ChromaHandler chroma_handler = ChromaHandler() # Create temp folder in current working directory base_dir = os.getcwd() temp_folder = os.path.join(base_dir, "temp") if not os.path.exists(temp_folder): os.makedirs(temp_folder, exist_ok=True) import threading @cms_router.post("/update_kb/", response_model=bool) def update_kb(): try: logger.info("Updating KB...") s3_link = chroma_handler.backup_and_upload_database() logger.info(f"KB updated successfully. :{s3_link}") return True except Exception as e: logger.error(f"Error updating KB: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) def update_kb_background_task(): try: s3_link = chroma_handler.backup_and_upload_database() logger.info(f"KB updated successfully. :{s3_link}") except Exception as e: logger.error(f"Error updating KB in background task: {str(e)}") @cms_router.post("/chunks/upload/", response_model=TaskStatus) async def upload_chunks( files: List[UploadFile] = File([]), urls: List[str] = Form([]), database: Literal["general", "pre_sales"] = Form("general"), ): # Initialize file_entries file_entries = [] # Check for duplicate filenames and remove them seen_filenames = set() unique_files = [] for file in files: if file.filename not in seen_filenames: seen_filenames.add(file.filename) unique_files.append(file) # Process CSV file if present csv_file = next((file for file in unique_files if file.filename.endswith('.csv')), None) if csv_file: csv_content = await csv_file.read() csv_lines = csv_content.decode('utf-8').splitlines() # Use csv.reader to extract URLs from the CSV csv_reader = csv.reader(csv_lines) for row in csv_reader: urls.extend(row) # Add each row's content to the urls list unique_files.remove(csv_file) # Remove the CSV file from the files list # Create file entries for remaining files, ignoring the CSV file for file in unique_files: file_id = chroma_handler.check_file_exist_by_file_name(file.filename, database) # Pass db_type if file_id: file_entries.append({"file_id": str(uuid.uuid4()), "file_name": file.filename, "status": "exists"}) else: file_entries.append({"file_id": str(uuid.uuid4()), "file_name": file.filename, "status": "pending"}) for url in urls: file_id = chroma_handler.check_file_exist_by_file_name(url, database) # Pass db_type if file_id: file_entries.append({"file_id": str(uuid.uuid4()), "file_name": url, "status": "exists"}) else: file_entries.append({"file_id": str(uuid.uuid4()), "file_name": url, "status": "pending"}) task_id = chroma_handler.create_task(file_entries) def run_background_task(func, *args): """Helper function to run a task in a separate thread.""" thread = threading.Thread(target=func, args=args) thread.start() try: if unique_files: for file, file_entry in zip(unique_files, file_entries[:len(unique_files)]): if file_entry['status'] == "exists": continue file_content = await file.read() file_name = file.filename file_extension = file_name.split(".")[-1] # Check for valid file types if file_extension not in ["pdf", "xlsx"]: raise HTTPException(status_code=400, detail=f"Invalid file type: {file_extension}. Only 'pdf' and 'xlsx' are allowed.") local_file_path = os.path.join(temp_folder, file_name) with open(local_file_path, "wb") as f: f.write(file_content) # Start the upload to S3 task in a separate thread run_background_task(chroma_handler.send_to_s3, local_file_path, file_entry['file_id']) if file_extension == "xlsx": chunk = Chunk(content=file_content.decode('latin1'), type=file_extension, file_name=file_name, db_type="email") else: chunk = Chunk(content=file_content.decode('latin1'), type=file_extension, file_name=file_name, db_type="general") # Start the add_chunk task in a separate thread run_background_task(chroma_handler.add_chunk, chunk.dict(), chunk.file_name, chunk.db_type, file_entry['file_id'], database) # Process URLs, ignoring the CSV file if urls: for url, file_entry in zip(urls, file_entries[len(unique_files):]): if file_entry['status'] == "exists": continue chunk = Chunk(content="content", type="url", file_name=url, db_type="general") # Start the add_chunk task in a separate thread run_background_task(chroma_handler.add_chunk, chunk.dict(), chunk.file_name, chunk.db_type, file_entry['file_id'], database) chroma_handler.update_task_status(file_id=file_entry['file_id'], s3_link=url) # Capitalize the status of each file for file in file_entries: file['status'] = file['status'].capitalize() return {"task_id": task_id, "status": "Pending", "files": file_entries} except Exception as e: chroma_handler.update_task_status(task_id=task_id, status="failed") logger.error(f"Error uploading chunks: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # @cms_router.post("/chunks/ingest_json/", response_model=Dict[str, Any]) # async def ingest_json_documents(documents: List[Document], file_id: str, background_tasks: BackgroundTasks): # # try: # background_tasks.add_task(chroma_handler.backup_and_upload_database) # # Retrieve the task ID associated with the file ID # task_id = chroma_handler.get_task_id_by_file_id(file_id) # print("Task ID: ", task_id) # if not task_id: # logger.error(f"No task found for the provided file ID.") # raise HTTPException(status_code=404, detail="No task found for the provided file ID.") # chroma_handler.update_task_status(file_id=file_id, status="ingesting") # s3_link = chroma_handler.get_s3_link(file_id) # if not s3_link: # logger.error(f"File not found.") # raise HTTPException(status_code=404, detail="File not found.") # # Save the documents to a local file # local_file_path = os.path.join(temp_folder, "temp.json") # with open(local_file_path, 'w') as file: # json.dump([doc.dict() for doc in documents], file, indent=4) # # Upload the local file to the S3 object # s3_url, error = update_s3_object(s3_link, local_file_path) # os.remove(local_file_path) # if error: # logger.error(f"Error uploading file to S3: {error}") # raise HTTPException(status_code=500, detail=error) # # Ingest the documents to the database # database = documents[0].metadata.get("database", "general") # db_type = documents[0].metadata.get("db_type", "general") # background_tasks.add_task(chroma_handler.ingest_documents, [doc.dict() for doc in documents], db_type, database, file_id) # return {"success": True, "task_id": task_id} # except Exception as e: # if task_id: # chroma_handler.update_task_status(task_id, "failed") # os.remove(os.path.join(temp_folder, "temp.json")) # logger.error(f"Error ingesting JSON documents: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @cms_router.get("/task/{task_id}", response_model=TaskStatus) def get_task_status(task_id: str): try: # Retrieve all file IDs associated with the task files = chroma_handler.get_task_status(task_id) # Check the status of each file statuses = [file['status'] for file in files] if any(status == "Failed" for status in statuses): overall_status = "Failed" elif any(status == "Pending" for status in statuses): overall_status = "Pending" elif all(status == "Published" for status in statuses): overall_status = "Published" elif any(status == "Exists" for status in statuses): overall_status = "Exists" else: overall_status = "Unknown" # Or handle unexpected status return {"task_id": task_id, "status": overall_status.capitalize(), "files": files} except Exception as e: logger.error(f"Error getting task status: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @cms_router.post("/chunks/delete_all/", response_model=Dict[str, Any]) def delete_all_file_chunks(request: DeleteFileByIDRequest , background_tasks: BackgroundTasks): try: background_tasks.add_task(chroma_handler.backup_and_upload_database) success = chroma_handler.delete_all_file_chunks_by_file_id(request.file_id) if success: return {"success": True, "message": "All file chunks deleted successfully."} else: return {"success": False, "message": "No file chunks found for the provided file ID."} except Exception as e: logger.error(f"Error deleting all file chunks: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # @cms_router.delete("/chunks/delete_single/", response_model=bool) # def delete_single_chunk(request: DeleteFileByIDRequest, background_tasks: BackgroundTasks): # # try: # background_tasks.add_task(chroma_handler.backup_and_upload_database) # chroma_handler.delete_single_chunk(request.insertion_id) # return True # except Exception as e: # logger.error(f"Error deleting single chunk: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @cms_router.post("/chunks/similarity_search/", response_model=List[Dict]) def similarity_search(request: SimilaritySearchRequest): try: return chroma_handler.get_chunks(request.question, request.database) except Exception as e: logger.error(f"Error performing similarity search: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # @cms_router.put("/chunks/update/", response_model=bool) # def update_chunk(request: UpdateChunkRequest, background_tasks: BackgroundTasks, auth_request: Request): # # try: # background_tasks.add_task(chroma_handler.backup_and_upload_database) # chroma_handler.update_chunk(request.insertion_id, request.new_content) # return True # except Exception as e: # logger.error(f"Error updating chunk: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @cms_router.get("/s3resources/search/", response_model=Dict[str, Any]) async def search_s3_resources( query: str, limit: int = Query(10, ge=1), offset: int = Query(0, ge=0), db_type: Literal["general", "pre_sales"] = Query(None) # Add db_type filter ): try: results = chroma_handler.search_s3_resources(query, limit, offset, db_type) # Pass db_type to the handler new_results = generate_multiple_presigned_urls(results) return new_results except Exception as e: logger.error(f"Error searching S3 resources: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @cms_router.get("/files/", response_model=Dict[str, Any]) async def get_all_files_with_urls( limit: int = Query(10, ge=1), # Ensure limit is at least 1 offset: int = Query(0, ge=0), # Ensure offset is non-negative db_type: Literal["general", "pre_sales"] = Query(...) # Add db_type filter ): try: if db_type not in ["general", "pre_sales"]: raise HTTPException(status_code=400, detail="Invalid database type provided.") result = chroma_handler.get_all_files_with_urls(limit=limit, offset=offset, db_type=db_type) new_results = generate_multiple_presigned_urls(result) return new_results except Exception as e: logger.error(f"Error getting all files with URLs: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # @cms_router.get("/get_json_from_s3/", response_model=List[Dict[str, Any]]) # async def download_s3_json(s3_object_url: str, auth_request: Request): # # try: # # Ensure the temp folder exists # if not os.path.exists(temp_folder): # os.makedirs(temp_folder, exist_ok=True) # local_file_path = download_from_s3(s3_object_url, temp_folder) # # Ensure the file was downloaded # if not local_file_path or not os.path.exists(local_file_path): # logger.error(f"File was not downloaded successfully.") # raise HTTPException(status_code=500, detail="File was not downloaded successfully.") # with open(local_file_path, 'r') as file: # json_content = json.load(file) # if not isinstance(json_content, list): # raise HTTPException(status_code=500, detail="Downloaded content is not a valid JSON array.") # return json_content # except json.JSONDecodeError: # logger.info(f"Failed to decode JSON content.") # raise HTTPException(status_code=500, detail="Failed to decode JSON content.") # except Exception as e: # logger.error(f"Error downloading JSON file from S3: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) # finally: # if local_file_path and os.path.exists(local_file_path): # os.remove(local_file_path) # @cms_router.post("/prompts/insert/", response_model=bool) # async def insert_prompts(prompts_request: PromptsRequest, ): # try: # # Call the insert_suggested_prompts method with the provided prompts # chroma_handler.insert_suggested_prompts(prompts_request.prompts) # return True # except Exception as e: # logger.info(f"Error inserting prompts: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) # @cms_router.post("/get_chunks_by_file_id/", response_model=List[Dict]) # async def get_chunks_by_file_id(file_id: str, limit: int = 10, offset: int = 0): # # try: # return chroma_handler.get_file_chunks_by_file_id(file_id, limit, offset) # except Exception as e: # logger.error(f"Error getting chunks by file ID: {str(e)}") # raise HTTPException(status_code=500, detail=str(e)) @cms_router.post("/chunks/ingest_json_with_weburl/", response_model=Dict[str, Any]) async def ingest_json_documents_with_weburl(documents: Dict[str, List[Document]], background_tasks: BackgroundTasks): try: task_id = str(uuid.uuid4()) file_ids = {} web_url_entries = [] for web_url, docs in documents.items(): print(f"Processing web URL: {web_url}") file_id = str(uuid.uuid4()) file_ids[web_url] = file_id web_url_entries.append({"file_id": file_id, "file_name": web_url}) database = docs[0].metadata.get("database", "general") db_type = docs[0].metadata.get("db_type", "general") if not docs: raise HTTPException(status_code=400, detail="No documents provided for ingestion.") print(f"Ingesting documents for file_id: {file_id}, database: {database}, db_type: {db_type}") chroma_handler.ingest_documents([doc.dict() for doc in docs], db_type, database, file_id) chroma_handler.create_task_for_custom_mapping([], web_url_entries) return {"success": True, "task_id": task_id, "file_ids": file_ids} except Exception as e: logger.error(f"Error ingesting JSON documents with web URL: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) def delete_file(file_path: str): """Delete the file from the filesystem.""" if os.path.exists(file_path): os.remove(file_path) @cms_router.get("/download/") async def download_file(s3_link: str, background_tasks: BackgroundTasks): """ Download a file from S3 and return it to the frontend. :param s3_link: The S3 link of the file to download. """ temp_folder = os.path.join(os.getcwd(), "temp") if not os.path.exists(temp_folder): os.makedirs(temp_folder) local_file_path = download_from_s3(s3_link, temp_folder) if local_file_path is None: raise HTTPException(status_code=404, detail="File not found on S3.") background_tasks.add_task(delete_file, local_file_path) return FileResponse(local_file_path, media_type='application/octet-stream', filename=os.path.basename(local_file_path)) @cms_router.post("/generate_presigned_url/") async def generate_presigned_url_endpoint(s3_link: str): """ Generate a pre-signed URL for an S3 object based on the provided S3 link. :param s3_link: The full S3 link of the file to download. :return: A JSON response containing the pre-signed download link and expiration time. """ try: parsed_url = urlparse(s3_link) bucket_name = parsed_url.netloc.split('.')[0] s3_key = parsed_url.path.lstrip('/') # Extract file name and extension file_name_with_ext = os.path.basename(s3_key) file_name, ext = os.path.splitext(file_name_with_ext) # Remove the timestamp from the file name file_name_cleaned = file_name.rsplit('_', 1)[0] + ext print(f"Bucket Name: {bucket_name}, S3 Key: {s3_key}, asd: {file_name_cleaned}") download_url = generate_presigned_url(s3_key, file_name_cleaned) return JSONResponse(content={ "download_link": download_url, "expires_at": (datetime.utcnow() + timedelta(minutes=15)).isoformat() }) except Exception as e: raise HTTPException(status_code=500, detail=str(e))