Skip to content

Api

rag(background_tasks, config, items) async

Executes a RAG (Retrieval-Augmented Generation) pipeline task asynchronously.

This function triggers a background task to run the RAG pipeline using the provided configuration, query, and prompt details. The results are stored in Redis.

Parameters:

Name Type Description Default
background_tasks BackgroundTasks

FastAPI's background task manager to handle the asynchronous execution of the RAG pipeline.

required
config AppConfig

Configuration object containing user, task, and analysis details.

required
items RagItems

Object containing the prompt and query for the RAG pipeline.

required

Returns:

Name Type Description
JSONResponse

A JSON response containing the Redis key associated with the RAG task results,

e.g., {"redis_key_response": }.

Source code in view/api/api.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@api.post("/rag")
async def rag(background_tasks: BackgroundTasks, config: AppConfig, items: RagItems):
   """
    Executes a RAG (Retrieval-Augmented Generation) pipeline task asynchronously.

    This function triggers a background task to run the RAG pipeline using the provided
    configuration, query, and prompt details. The results are stored in Redis.

    Args:
        background_tasks (BackgroundTasks): FastAPI's background task manager to handle
            the asynchronous execution of the RAG pipeline.
        config (AppConfig): Configuration object containing user, task, and analysis details.
        items (RagItems): Object containing the prompt and query for the RAG pipeline.

    Returns:
        JSONResponse: A JSON response containing the Redis key associated with the RAG task results,
        e.g., {"redis_key_response": <redis_key>}.
    """
   redis_key_response = background_tasks.add_task(ct_rag.base_rag_redis_pipeline_controller,
        prompt=items.prompt,
        query=items.query,
        config=config,
        redis_client=REDIS_CLIENT,
        k=6,
        redis_url=REDIS_URL,
        chunk_size=8000,
        )
   return JSONResponse({"redis_key_response": redis_key_response})

read_root()

Root endpoint for the API.

This endpoint serves as a simple health check or welcome message for the API.

Returns:

Name Type Description
dict

A dictionary containing a greeting message, e.g., {"Hello": "World"}.

Source code in view/api/api.py
26
27
28
29
30
31
32
33
34
35
36
@api.get("/")
def read_root():
    """
    Root endpoint for the API.

    This endpoint serves as a simple health check or welcome message for the API.

    Returns:
        dict: A dictionary containing a greeting message, e.g., {"Hello": "World"}.
    """
    return {"Hello": "World"}

status_user(user) async

Retrieves the status of tasks for a given user.

This endpoint fetches the current status of all tasks associated with a specific user.

Parameters:

Name Type Description Default
user str

The identifier of the user whose task status is being queried.

required

Returns:

Name Type Description
dict

A dictionary containing the status of tasks for the user.

Source code in view/api/api.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@api.get("/status/{user}")
async def status_user(user: str):
    """
    Retrieves the status of tasks for a given user.

    This endpoint fetches the current status of all tasks associated with a specific user.

    Args:
        user (str): The identifier of the user whose task status is being queried.

    Returns:
        dict: A dictionary containing the status of tasks for the user.
    """
    status = ct_response.status_by_user(user=user)
    return status

upload(background_tasks, file=File(...), task_id='1234', type_of_analysis='medical', user='user') async

Handles file upload and processing via a POST endpoint.

This endpoint allows users to upload a file for analysis. The type of analysis can be specified as 'medical', 'document', or 'other'. The uploaded file is processed asynchronously in the background.

Parameters:

Name Type Description Default
background_tasks BackgroundTasks

FastAPI's background task manager to handle post-upload processing.

required
file UploadFile

The file to be uploaded. Must be provided in the request.

File(...)
task_id str

An identifier for the task. Defaults to '1234'.

'1234'
type_of_analysis str

Specifies the type of analysis for the uploaded file. Can be 'medical', 'document', or 'other'. Defaults to 'medical'.

'medical'
user str

The user initiating the upload. Defaults to 'user'.

'user'

Returns:

Name Type Description
dict dict

A dictionary containing the key for the uploaded file, e.g., {"key_file": uploaded}.

Source code in view/api/api.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@api.post("/upload")
async def upload(
    background_tasks: BackgroundTasks,
    file: UploadFile = File(...), 
    task_id: str = '1234', 
    type_of_analysis: str = 'medical', 
    user: str = 'user',
    )->dict:
    """
        Handles file upload and processing via a POST endpoint.

        This endpoint allows users to upload a file for analysis. The type of analysis can
        be specified as 'medical', 'document', or 'other'. The uploaded file is processed 
        asynchronously in the background.

        Args:
            background_tasks (BackgroundTasks): FastAPI's background task manager to handle
                post-upload processing.
            file (UploadFile): The file to be uploaded. Must be provided in the request.
            task_id (str, optional): An identifier for the task. Defaults to '1234'.
            type_of_analysis (str, optional): Specifies the type of analysis for the uploaded file.
                Can be 'medical', 'document', or 'other'. Defaults to 'medical'.
            user (str, optional): The user initiating the upload. Defaults to 'user'.

        Returns:
            dict: A dictionary containing the key for the uploaded file, e.g., {"key_file": uploaded}.
    """

    file_bytes = await file.read()
    file_name = file.filename.split("/")[-1]

    config = AppConfig(
        user=user, 
        task_id=task_id,
        type_of_analysis=type_of_analysis,
        )

    uploaded = ct_upload.upload_controller(
        file_bytes=file_bytes,
        file_name=file_name,
        config=config,
    )

    return {"key_file": uploaded}