Skip to content

Ct rag

base_rag_redis_pipeline_controller(prompt, query, config, redis_client, redis_url, k=6, chunk_size=8000, service='azure')

Executes a Retrieval-Augmented Generation (RAG) pipeline with a list of files.

Parameters:

Name Type Description Default
prompt str

The system prompt for RAG.

required
query str

The user query for retrieving relevant information.

required
config AppConfig

The application configuration object with user, task, and analysis details.

required
redis_client Redis

The Redis client instance.

required
redis_url str

The Redis server URL.

required
k int

The number of top results to retrieve. Defaults to 6.

6
chunk_size int

The maximum chunk size for embeddings. Defaults to 8000. The text-embedding-3-large max inputs are 8191 tokens

8000
service str

Service of the LLM. "azure" or "openai"

'azure'

Returns:

Name Type Description
str str

The Redis key where the RAG results are stored.

Raises:

Type Description
Exception

If an error occurs during the pipeline execution.

Source code in controller/ct_rag.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def base_rag_redis_pipeline_controller(    
    prompt: str,
    query: str,
    config: AppConfig,
    redis_client: Redis,
    redis_url: str,
    k: int = 6,
    chunk_size: int = 8000,
    service: str = "azure",
) -> str:
    """
    Executes a Retrieval-Augmented Generation (RAG) pipeline with a list of files.

    Args:
        prompt (str): The system prompt for RAG.
        query (str): The user query for retrieving relevant information.
        config (AppConfig): The application configuration object with user, task, and analysis details.
        redis_client (Redis): The Redis client instance.
        redis_url (str): The Redis server URL.
        k (int): The number of top results to retrieve. Defaults to 6.
        chunk_size (int): The maximum chunk size for embeddings. Defaults to 8000. The text-embedding-3-large max inputs are 8191 tokens
        service (str): Service of the LLM. "azure" or "openai"

    Returns:
        str: The Redis key where the RAG results are stored.

    Raises:
        Exception: If an error occurs during the pipeline execution.
    """
    redis_key_status = f"status:{config.user}:{config.task_id}:{config.type_of_analysis}"
    sufix = f"{config.user}:{config.task_id}:{config.type_of_analysis}"
    file_list_sufix = f"file:{sufix}*"

    try:
        file_keys_list = redis_client.keys(file_list_sufix)
        file_name_list = []

        for redis_key_file in file_keys_list:
            # Creating embeddings
            log_and_store(f"Creating Embedding: {redis_key_file}", config)

            content = redis_client.hget(redis_key_file, 'file').decode('utf-8')
            file_name = redis_client.hget(redis_key_file, 'file_name').decode('utf-8')
            embedding = InspectorEmbeddings()
            embedding.create_embedding(
                content=content, 
                file_name=file_name, 
                chunk_size=chunk_size, 
                service=service
                )

            # Load the tokenized content on Redis
            tokenized_content = str(embedding.text_splitted_list)
            redis_client.hset(redis_key_file, mapping={'tokenized': tokenized_content})

            # Load embeddings on Redis
            embedding.prepare_data()
            vector_store = RedisVectorStore(redis_url=redis_url)
            vector_store.load_data(embedding, config)

            log_and_store(f"Embeddings loaded", config)
            file_name_list.append(file_name)

        # RAG
        rag_obj = RAGRedis(
            config=config, 
            redis_url=redis_url, 
            k=k, 
            service=service
            )

        rag_obj.rag(query, prompt)

        # Prepare data to save on Redis
        data_to_save = SaveRedisPydantic(
            response = json.dumps(rag_obj.response),
            context =  json.dumps(rag_obj.context),
            usage =  json.dumps(rag_obj.usage),
            response_json =  json.dumps(rag_obj.response_json),
            messages =  json.dumps(rag_obj.messages),
            type_of_analysis =  json.dumps(config.type_of_analysis),
            technique = json.dumps("RAG"),
            evaluation = 0,
            observation = "",
            file_names = json.dumps(file_name_list)
        )

        redis_key = ct_response.save_response_to_redis(config, data_to_save )
        return redis_key

    except Exception as e:
        raise

save_rag_redis(config, rag_obj, redis_client)

Saves the data from a RAGRedis object into Redis using a single hash key.

Parameters:

Name Type Description Default
config AppConfig

The application configuration object with user, task, and analysis details.

required
rag_obj RAGRedis

The RAGRedis object containing the response and associated data.

required
redis_client Redis

The Redis client instance.

required

Returns:

Name Type Description
str str

The Redis key under which the data is saved.

Raises:

Type Description
Exception

If an error occurs during the save operation.

Source code in controller/ct_rag.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def save_rag_redis(config: AppConfig, rag_obj: RAGRedis, redis_client: Redis) -> str:
    """
    Saves the data from a RAGRedis object into Redis using a single hash key.

    Args:
        config (AppConfig): The application configuration object with user, task, and analysis details.
        rag_obj (RAGRedis): The RAGRedis object containing the response and associated data.
        redis_client (Redis): The Redis client instance.

    Returns:
        str: The Redis key under which the data is saved.

    Raises:
        Exception: If an error occurs during the save operation.
    """
    try:
        redis_key = f"rag:{config.user}:{config.task_id}:{config.type_of_analysis}"
        redis_key_status = f"status:{config.user}:{config.task_id}:{config.type_of_analysis}"

        # Serializar os dados para JSON
        data_to_save = {
            'response': json.dumps(rag_obj.response),
            'context': json.dumps(rag_obj.context),
            'usage': json.dumps(rag_obj.usage),
            'response_json': json.dumps(rag_obj.response_json),
            'messages': json.dumps(rag_obj.messages),
            'type_of_analysis': json.dumps(config.type_of_analysis),
        }

        # Armazenar todos os dados sob uma Ășnica chave de hash
        redis_client.hset(redis_key, mapping=data_to_save)
        log_and_store(f"Concluded", config)
        return redis_key
    except Exception as e:
        # logging.error(f"Ocorreu um erro ao salvar no Redis: {e}")
        log_and_store(f"RAG: Error to save on Redis: {e}", config)
        raise