Loading
Keunyoung Yoon

Data Engineer

Data Scientist

Data Analyst

Keunyoung Yoon

Data Engineer

Data Scientist

Data Analyst

Blog Post

AI Chatbot Using Solar LLM

09/26/2024 Portfolio

Solar LLM is a solution specialized in East Asian languages, and it is set to respond in Korean (as it ensures the best results). The response below, translated into English, is as follows:

(Answer in English) “Through a two-step process of the hash function, an infinite number of keys can be mapped to a finite number of indices. In the first step, a hash code is used to compress the value of the key into a fixed-size number. This hash code uniquely identifies the key. In the second step, a compression function is used to map the hash code to a finite range of indices. This compression function reduces the hash code to a finite set of indices. This two-step process allows an infinite number of keys to be mapped to a finite number of indices, enabling an efficient implementation of the hash table.”

Project Overview

This project aims to develop a chatbot utilizing Solar LLM with Retrieval-Augmented Generation (RAG) for efficient document-based Q&A. By leveraging Kafka for distributed processing, the system handles PDF uploads, embedding, and real-time interactions. Key steps include document indexing, splitting, embedding in a vector store, and distributed querying through Kafka to ensure scalable, high-performance responses.

Github Repositories

https://github.com/youngyoony/rag_llm_project

Pre-project planning

Document AI – Struggles with identifying tables, images, and charts when uploading PDFs to GPT. Needs a model that handles OCR and document recognition properly.

Solar LLM – A model focused on East Asian languages.
Solar LLM Chatbot for:
Chat: Natural language interaction.
Embedding: Ensures documents are transformed into embeddings that the natural language model can process, aiding in retrieval for user questions.
Translation: Interpretation functionality.
Groundedness Check: Ensures the sources of the results are accurate through secondary verification.
Text-to-SQL: Converts natural language into SQL queries.

Aiming to create a RAG-based chatbot using both Chat API and Embedding API. Chat API implementation is complete. Embedding is critical, as AI doesn’t inherently understand text; it breaks words into tokens and converts them into numerical embeddings, which AI can process.

Embedding Types:
Solar Embedding Large Passage: Handles large PDF or text files (up to 4000 tokens), converting them into embeddings.
Solar Embedding Large Query: Embeds user questions.

System Overview:
• Use embeddings to upload values to a vector store. When users ask a question, embeddings are retrieved based on similarity and answers are generated.
RAG Chatbot: Enables document upload, retrieves answers using LLM, and delivers a response.

RAG (Retrieval-Augmented Generation) Process:
Indexing: Preprocessing phase where context-rich documents are uploaded for training.
Loading: Uses functions tailored to the document type (e.g., PDFs with tables, Korean documents) for efficient loading.
Splitting: Large files are divided into smaller chunks, improving search efficiency and ensuring they fit within the model’s input limits.
Storing: The chunks are uploaded to a database, where the embeddings are converted into numerical values that the computer can understand. Upstage API will be used for this, as different APIs (Open API, HuggingFace API) vary in how they handle and understand the embeddings, influencing the AI’s search capabilities.
Uploading to Database: Different databases have their own algorithms and methods for handling search queries. The embedding values are uploaded into the database, and how well they retrieve answers depends on the search algorithms within the DB.

Retrieval and Generation Stages:
Retrieve: Search through the DB based on the user’s query.
Generate: The LLM generates an understandable response from the retrieved information.

Once the PDF file is uploaded, the system goes through preprocessing → PDF Preview → Answer generation through the chatbot interface.

Without Kafka (chatbot.py)

import os
from dotenv import load_dotenv
load_dotenv()
import streamlit as st
import time
import base64
import uuid
import tempfile
from langchain_upstage import UpstageEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader

if "id" not in st.session_state:
    st.session_state.id = uuid.uuid4()
    st.session_state.file_cache = {} # Save the cache of the uploaded PDF file here so that it can remember.

session_id = st.session_state.id
client = None

def reset_chat():
    st.session_state.messages = []
    st.session_state.context = None

def display_pdf(file):
    ## Opening file from file path

    st.markdown("### PDF Preview")
    base64_pdf = base64.b64encode(file.read()).decode("utf-8")

    ## Embedding PDF in HTML
    pdf_display = f"""<iframe src="data:application/pdf;base64,{base64_pdf}" width="400" height="100%" type="application/pdf"
                        style="height:100vh; width:100%"
                    >
                    </iframe>"""

    ## Displaying File
    st.markdown(pdf_display, unsafe_allow_html=True)


with st.sidebar:
    # “st” is a package related to UI/UX website configuration with Streamlit. 
    # It is used here to configure the sidebar.

    st.header(f"Add your documents!")
    # A pop-up UX for file upload will automatically appear.
    
    uploaded_file = st.file_uploader("Choose your `.pdf` file", type="pdf")
    # When a file is uploaded, it is stored in the uploaded_file variable.


    if uploaded_file:
        print(uploaded_file)
        try:
            file_key = f"{session_id}-{uploaded_file.name}"
            # Create a file_key to save the file_cache.

            with tempfile.TemporaryDirectory() as temp_dir:
                file_path = os.path.join(temp_dir, uploaded_file.name)
                print("file path:", file_path)
                
                with open(file_path, "wb") as f:
                    f.write(uploaded_file.getvalue())
                    # Function to save the "uploaded file" to a temporary file, open it, and enable indexing.
                
                file_key = f"{session_id}-{uploaded_file.name}"
                st.write("Indexing your document...")
                # Informing the user that indexing is in progress.

                if file_key not in st.session_state.get('file_cache', {}):

                    if os.path.exists(temp_dir):
                            print("temp_dir:", temp_dir)
                            loader = PyPDFLoader(
                                # Loads the file during preprocessing. "PyPDFLoader" loads the content of the file stored in the temporary directory.
                                # PyPDFLoader is one of the most commonly used examples in Langchain. It handles Korean text processing well and is efficient.
                                # If the document has 10 pages, the content will be split into chunks, with each page stored as a separate chunk: Pages 0, 1, 2, ..., as many as the number of pages.
                                file_path
                            )
                    else:    
                        st.error('Could not find the file you uploaded, please check again...')
                        st.stop()
                    
                    pages = loader.load_and_split()
                    # "load_and_split" function is used to split the file, and the split content is stored in pages.
                    # Since we are using only one page, there's no need to split it further.

                    vectorstore = Chroma.from_documents(pages, UpstageEmbeddings(model="solar-embedding-1-large"))
                    # The split content is stored in pages, and it needs to be converted into embeddings (Upstage Model).
                    # Chroma is one type of database.

                    retriever = vectorstore.as_retriever(k=2)
                    # Use as_retriever" to load the vectorstore.
                    # (k=2) means that when fetching values for a user's search query, it will retrieve the top 2 most relevant results.
                    # Once the retriever is activated, preprocessing is complete.

                    # Now, an LLM model (Solar_llm) will fetch content when the user searches and generate human-understandable answers.

                    from langchain_upstage import ChatUpstage
                    from langchain_core.messages import HumanMessage, SystemMessage

                    chat = ChatUpstage(upstage_api_key=os.getenv("UPSTAGE_API_KEY"))

                    # Now, the variable chat can hold the model "solar_llm". Setup complete.
                    
                    ## 1) The first step to give the chatbot 'memory'
                    
                    ## Analyze previous messages and the latest user question to rephrase the question in a way that makes sense on its own, without needing context.
                    ## In other words, rephrase the new question so that it focuses only on the question itself.
                    from langchain.chains import create_history_aware_retriever
                    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

                    contextualize_q_system_prompt = """이전 대화 내용과 최신 사용자 질문이 있을 때, 이 질문이 이전 대화 내용과 관련이 있을 수 있습니다. 
                    이런 경우, 대화 내용을 알 필요 없이 독립적으로 이해할 수 있는 질문으로 바꾸세요. 
                    질문에 답할 필요는 없고, 필요하다면 그저 다시 구성하거나 그대로 두세요."""

                    # MessagePlaceholder: Uses the 'chat_history' input key to include previous message records in the prompt.
                    # The prompt consists of the prompt itself, message history (context information), and the user's question.
                    contextualize_q_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", contextualize_q_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

			        # This is the prompt that rephrases the user's question to improve search results.
                    # For example, if a user has been discussing the benefits of apple juice and then asks, "How much apple juice should I drink in a day?"
                    # Passing this question directly to the database might result in irrelevant responses from the chatbot, so the question needs to be rephrased considering the previous conversation.
                    # The question could be rephrased to something like, "Given the benefits of apple juice are abcd, how much should I drink in a day?"
                    # This prompt is stored in the variable 'contextualize' - which helps the chatbot remember the conversation context.
					
                    # Based on this, create a retriever that remembers the message history.
                    history_aware_retriever = create_history_aware_retriever(
                        # Function to create a chain. (Provided by Langchain) Since there's a template, just need to piece together the code blocks.
                        chat, retriever, contextualize_q_prompt
                    )
                    # From "from langchain.chains" to here is the 1st chain.
                    # Now, based on this chain, a 2nd chain to determine how to answer the actual user's question is needed.

                    ## 2) The second step is to create a retriever chain that can fetch documents using the newly created chain.		
                    from langchain.chains import create_retrieval_chain
                    from langchain.chains.combine_documents import create_stuff_documents_chain

                    qa_system_prompt = """질문-답변 업무를 돕는 보조원입니다. 
                    질문에 답하기 위해 검색된 내용을 사용하세요. 
                    답을 모르면 모른다고 말하세요. 
                    답변은 세 문장 이내로 간결하게 유지하세요.
                    {context}"""
                    qa_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", qa_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

                    question_answer_chain = create_stuff_documents_chain(chat, qa_prompt)
                    # Up to here is the second chain.
                    # The best thing about "Langchain" is that you can create multiple chains and link them together.
                    ## The output includes input, chat_history, context, and answer.
                    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
                    # A function that creates a retrieval function for searching. Linked them together. 
                    # Finally, the chain that performs RAG (Retrieval-Augmented Generation)
                    # This will be called every time a question is asked to the chatbot.

                st.success("Ready to Chat!")
                display_pdf(uploaded_file)
        except Exception as e:
            st.error(f"An error occurred: {e}")
            st.stop()     

## Name of the Website
st.title("Solar LLM Chatbot")

if "openai_model" not in st.session_state:
    st.session_state["openai_model"] = "gpt-3.5-turbo"

if "messages" not in st.session_state:
    st.session_state.messages = []

## Setup to record the conversation
## In Streamlit, the content will be lost if not activated.
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])
        
## To prevent excessive prompt costs
MAX_MESSAGES_BEFORE_DELETION = 4

## Receive user input on the website
# and execute the AI agent created above to get a response.
if prompt := st.chat_input("Ask a question!"):
    
## If it's a question from the user, show the user icon and the question.
     # If there are more than 4 stored conversation records, truncate them.
    if len(st.session_state.messages) >= MAX_MESSAGES_BEFORE_DELETION:
        ## Remove the first two messages
        del st.session_state.messages[0]
        del st.session_state.messages[0]  
   
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)

# If it's a response from the AI, show the AI icon, execute the LLM, get the response, and stream it.
    with st.chat_message("assistant"):
        message_placeholder = st.empty()
        full_response = ""

        result = rag_chain.invoke({"input": prompt, "chat_history": st.session_state.messages})
        # Called rag_chain here and put the user's question into the prompt "input".
        # Also, put the previous conversation records into the "chat_history" to proceed while remembering the conversation context.
        
        ## Show the proof.
        with st.expander("Evidence context"):
            st.write(result["context"])

        for chunk in result["answer"].split(" "):
            full_response += chunk + " "
            time.sleep(0.2)
            message_placeholder.markdown(full_response + "▌")
            message_placeholder.markdown(full_response)
            
    st.session_state.messages.append({"role": "assistant", "content": full_response})

print("_______________________")
print(st.session_state.messages)

With Kafka / Version 1 (chatbot_mono.py)

chatbot_mono.py

1. Kafka Producer and Consumer Setup:
• A KafkaProducer and KafkaConsumer are initialized using the kafka-python library. The producer sends the conversation messages to a Kafka topic, and the consumer listens to this topic to read the messages.
• The producer is configured with a value_serializer to encode the message as JSON.
• The consumer deserializes messages and prints them out as they are received.

2. produce_message(role, content) Function:
• This function is responsible for sending (producing) messages to Kafka. It creates a message with the role (user or assistant) and content, then sends this message to the Kafka topic using the producer.send method.

3. run_consumer Function:
• The run_consumer function continuously listens for incoming messages from the Kafka topic. When a new message is received, it prints the content.

4. Threading for Kafka Consumer:
• The Kafka consumer runs in a separate thread using Python’s threading module, allowing the main application to continue running while listening for Kafka messages in the background.

5. Integration with the Chat Application:
• When the user sends a message or when the AI generates a response, both are passed through the produce_message function to send the message to Kafka.
• The AI’s response is produced after the question is processed by the RAG chain.

import os
import sys
from dotenv import load_dotenv
load_dotenv()
import streamlit as st
import time
import base64
import uuid
import tempfile
from langchain_upstage import UpstageEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader

from kafka import KafkaProducer, KafkaConsumer
import json

bootstrap_server = ['localhost:9092']
topic = 'message'
producer = KafkaProducer(
    bootstrap_servers=bootstrap_server,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
) # Producer = Local PC


def produce_message(role, content):
    print(f"producing message: {content}", file=sys.stderr)
    message = {
        "role": role,
        "content": content
    }
    producer.send(topic, message)
    producer.flush()


def run_consumer():
    consumer = KafkaConsumer(
        topic,
        group_id='test',
        bootstrap_servers=bootstrap_server,
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        # auto_offset_reset='earliest'
    ) # producer 하면 Kafka에 메시지가 들어가는데 우리도 확인 필요 / 확인하려면 Consumer 필요

    print("kafka test consumer is running")
    for message in consumer:
        print(f"Received kafka message: {message.value}")
        time.sleep(0.1)

import threading
consumer_thread = threading.Thread(target=run_consumer)
# consumer_thread.daemon = True 
consumer_thread.start()
# 대화 내용을 카프카 서버(브로커)로 보내서 받은 답변을 확인까지?

if "id" not in st.session_state:
    st.session_state.id = uuid.uuid4()
    st.session_state.file_cache = {} # Save the cache of the uploaded PDF file here so that it can remember.

session_id = st.session_state.id
client = None


def display_pdf(file):
    st.markdown("### PDF Preview")
    base64_pdf = base64.b64encode(file.read()).decode("utf-8")
    pdf_display = f"""<iframe src="data:application/pdf;base64,{base64_pdf}" width="400" height="100%" type="application/pdf"
                        style="height:100vh; width:100%"> </iframe>"""
    st.markdown(pdf_display, unsafe_allow_html=True)


with st.sidebar:
    st.header(f"Add your documents!")

    uploaded_file = st.file_uploader("Choose your `.pdf` file", type="pdf")

    if uploaded_file:
        print(uploaded_file)
        try:
            # Create a file_key to save the file_cache.
            file_key = f"{session_id}-{uploaded_file.name}"

            with tempfile.TemporaryDirectory() as temp_dir:
                file_path = os.path.join(temp_dir, uploaded_file.name)
                print("file path:", file_path)
                
                with open(file_path, "wb") as f:
                    f.write(uploaded_file.getvalue()) # save the "uploaded file" to a temporary file, open it, and enable indexing.
                
                file_key = f"{session_id}-{uploaded_file.name}"
                st.write("Indexing your document...")

                if file_key not in st.session_state.get('file_cache', {}):

                    if os.path.exists(temp_dir):
                            print("temp_dir:", temp_dir)
                            loader = PyPDFLoader(file_path)
                    else:    
                        st.error('Could not find the file you uploaded, please check again...')
                        st.stop()
                    
                    pages = loader.load_and_split()

                    vectorstore = Chroma.from_documents(pages, UpstageEmbeddings(model="solar-embedding-1-large"))

                    retriever = vectorstore.as_retriever(k=2)

                    from langchain_upstage import ChatUpstage
                    from langchain_core.messages import HumanMessage, SystemMessage

                    chat = ChatUpstage(upstage_api_key=os.getenv("UPSTAGE_API_KEY"))

                    from langchain.chains import create_history_aware_retriever
                    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

                    contextualize_q_system_prompt = """이전 대화 내용과 최신 사용자 질문이 있을 때, 이 질문이 이전 대화 내용과 관련이 있을 수 있습니다. 
                    이런 경우, 대화 내용을 알 필요 없이 독립적으로 이해할 수 있는 질문으로 바꾸세요. 
                    질문에 답할 필요는 없고, 필요하다면 그저 다시 구성하거나 그대로 두세요."""

                    contextualize_q_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", contextualize_q_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

                    history_aware_retriever = create_history_aware_retriever(
                        chat, retriever, contextualize_q_prompt
                    )
                    
                    from langchain.chains import create_retrieval_chain
                    from langchain.chains.combine_documents import create_stuff_documents_chain

                    qa_system_prompt = """질문-답변 업무를 돕는 보조원입니다. 
                    질문에 답하기 위해 검색된 내용을 사용하세요. 
                    답을 모르면 모른다고 말하세요. 
                    답변은 세 문장 이내로 간결하게 유지하세요.
                    {context}"""
                    qa_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", qa_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

                    question_answer_chain = create_stuff_documents_chain(chat, qa_prompt)
                    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

                st.success("Ready to Chat!")
                display_pdf(uploaded_file)
        except Exception as e:
            st.error(f"An error occurred: {e}")
            st.stop()     

st.title("Solar LLM Chatbot")

if "openai_model" not in st.session_state:
    st.session_state["openai_model"] = "gpt-3.5-turbo"

if "messages" not in st.session_state:
    st.session_state.messages = []


## Setup to record the conversation
## In Streamlit, the content will be lost if not activated.
for message in st.session_state.messages:

    with st.chat_message(message["role"]):
        st.markdown(message["content"])
        
MAX_MESSAGES_BEFORE_DELETION = 4

if prompt := st.chat_input("Ask a question!"):
    
## If it's a question from the user, show the user icon and the question.
     # If there are more than 4 stored conversation records, truncate them.
    if len(st.session_state.messages) >= MAX_MESSAGES_BEFORE_DELETION:
        ## Remove the first two messages
        del st.session_state.messages[0]
        del st.session_state.messages[0]  
   
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)

    produce_message("user", prompt)

    # If it's a response from the AI, show the AI icon, execute the LLM, get the response, and stream it.
    with st.chat_message("assistant"):
        message_placeholder = st.empty()
        full_response = ""

        result = rag_chain.invoke({"input": prompt, "chat_history": st.session_state.messages})
        # Called rag_chain here and put the user's question into the prompt "input".
        # Also, put the previous conversation records into the "chat_history" to proceed while remembering the conversation context.
        
        ## Show the proof.
        with st.expander("Evidence context"):
            st.write(result["context"])

        for chunk in result["answer"].split(" "):
            full_response += chunk + " "
            time.sleep(0.2)
            message_placeholder.markdown(full_response + "▌")
            message_placeholder.markdown(full_response)
            
    st.session_state.messages.append({"role": "assistant", "content": full_response})

    produce_message("assistant", full_response)

print("_______________________")
print(st.session_state.messages)

With Kafka / Version 2 (chatbot_logging.py)

chatbot_logging.py

chatbot_logging.py is primarily designed for logging chat interactions between the user and the AI chatbot into a local log file. This backs up messages locally in the event of any Kafka message failures, helping to prevent data loss. The focus is on ensuring the persistence of chat logs.

1. Logging Chat Messages:
• The first code includes logging functionality using Python’s logging module. This is to ensure that every conversation (user input and AI response) is written to a local log file (log/message.log).
• Each user query (prompt) and AI-generated response (full_response) is logged by calling logging.info(). The logs are stored in JSON format, which makes them structured and easier to process later.

2. Backup Purpose:
• The log file (log/message.log) serves as a backup of the chat history. In case the system crashes or you need to audit the conversation, the chat logs will be stored locally.

3. How It’s Used:
• Each time a message is sent or received, it’s written to the log file, and in the event of message loss (e.g., Kafka failure), you can still access the conversation history locally.

import os
from dotenv import load_dotenv
load_dotenv()
import streamlit as st
import time
import base64
import uuid
import tempfile
from langchain_upstage import UpstageEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader

import logging

log_filename = "log/message.log"
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(message)s')


if "id" not in st.session_state:
    st.session_state.id = uuid.uuid4()
    st.session_state.file_cache = {}

session_id = st.session_state.id
client = None


def display_pdf(file):
    st.markdown("### PDF Preview")
    base64_pdf = base64.b64encode(file.read()).decode("utf-8")
    pdf_display = f"""<iframe src="data:application/pdf;base64,{base64_pdf}" width="400" height="100%" type="application/pdf"
                        style="height:100vh; width:100%"> </iframe>"""
    st.markdown(pdf_display, unsafe_allow_html=True)


with st.sidebar:
    st.header(f"Add your documents!")

    uploaded_file = st.file_uploader("Choose your `.pdf` file", type="pdf")

    if uploaded_file:
        try:
            file_key = f"{session_id}-{uploaded_file.name}"

            with tempfile.TemporaryDirectory() as temp_dir:
                file_path = os.path.join(temp_dir, uploaded_file.name)
                
                with open(file_path, "wb") as f:
                    f.write(uploaded_file.getvalue())
                
                file_key = f"{session_id}-{uploaded_file.name}"
                st.write("Indexing your document...")

                if file_key not in st.session_state.get('file_cache', {}):
                    if os.path.exists(temp_dir):
                        loader = PyPDFLoader(file_path)
                    else:    
                        st.error('Could not find the file you uploaded, please check again...')
                        st.stop()
                    
                    pages = loader.load_and_split()

                    vectorstore = Chroma.from_documents(pages, UpstageEmbeddings(model="solar-embedding-1-large"))

                    retriever = vectorstore.as_retriever(k=2)

                    from langchain_upstage import ChatUpstage
                    from langchain_core.messages import HumanMessage, SystemMessage

                    chat = ChatUpstage(upstage_api_key=os.getenv("UPSTAGE_API_KEY"))

                    from langchain.chains import create_history_aware_retriever
                    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

                    contextualize_q_system_prompt = """이전 대화 내용과 최신 사용자 질문이 있을 때, 이 질문이 이전 대화 내용과 관련이 있을 수 있습니다. 
                    이런 경우, 대화 내용을 알 필요 없이 독립적으로 이해할 수 있는 질문으로 바꾸세요. 
                    질문에 답할 필요는 없고, 필요하다면 그저 다시 구성하거나 그대로 두세요."""

                    contextualize_q_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", contextualize_q_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

                    history_aware_retriever = create_history_aware_retriever(
                        chat, retriever, contextualize_q_prompt
                    )
                    
                    from langchain.chains import create_retrieval_chain
                    from langchain.chains.combine_documents import create_stuff_documents_chain

                    qa_system_prompt = """질문-답변 업무를 돕는 보조원입니다. 
                    질문에 답하기 위해 검색된 내용을 사용하세요. 
                    답을 모르면 모른다고 말하세요. 
                    답변은 세 문장 이내로 간결하게 유지하세요.
                    {context}"""
                    qa_prompt = ChatPromptTemplate.from_messages(
                        [
                            ("system", qa_system_prompt),
                            MessagesPlaceholder("chat_history"),
                            ("human", "{input}"),
                        ]
                    )

                    question_answer_chain = create_stuff_documents_chain(chat, qa_prompt)
                    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

                st.success("Ready to Chat!")
                display_pdf(uploaded_file)
        except Exception as e:
            st.error(f"An error occurred: {e}")
            st.stop()

st.title("Solar LLM Chatbot")

if "openai_model" not in st.session_state:
    st.session_state["openai_model"] = "gpt-3.5-turbo"

if "messages" not in st.session_state:
    st.session_state.messages = []


for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

MAX_MESSAGES_BEFORE_DELETION = 4

if prompt := st.chat_input("Ask a question!"):

    if len(st.session_state.messages) >= MAX_MESSAGES_BEFORE_DELETION:
        del st.session_state.messages[0]
        del st.session_state.messages[0]  
   
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)

    logging.info({"role": "user", "content": prompt})
    with st.chat_message("assistant"):
        message_placeholder = st.empty()
        full_response = ""

        result = rag_chain.invoke({"input": prompt, "chat_history": st.session_state.messages})
        
        with st.expander("Evidence context"):
            st.write(result["context"])

        for chunk in result["answer"].split(" "):
            full_response += chunk + " "
            time.sleep(0.2)
            message_placeholder.markdown(full_response + "▌")
            message_placeholder.markdown(full_response)
            
    st.session_state.messages.append({"role": "assistant", "content": full_response})
    logging.info({"role": "assistant", "content": full_response})

print("_______________________")
print(st.session_state.messages)

With Kafka / Version 3 (log_parser.py)

log_parser.py

log_parser.py involves Kafka as the main message broker and adds functionality to read the log file and push its content to Kafka. This is responsible for sending locally stored logs to Kafka and consuming messages from Kafka in real-time. It ensures that even logs stored on the local system can be sent to Kafka for centralized processing.

1. Kafka Producer and Consumer Setup:
KafkaProducer sends messages to the Kafka topic (message), and KafkaConsumer listens for messages from this topic.
• The producer reads lines from the log file and sends them to the Kafka broker.

2. Log File Processing (Producer):
• The process_log_file() function continuously reads the local log file (log/message.log), looking for new log entries (user and assistant messages).
• When a new log entry is found, it parses the line into a JSON object and sends it to the Kafka topic (message) using the producer.
• This ensures that any local logs can be sent to Kafka for further processing, even if the system needs to recover from a crash.

3. Kafka Consumer:
• The consume_messages() function is a KafkaConsumer that listens to the message topic and prints out any messages it receives.
• The consumer runs in a background thread, continuously listening to the Kafka topic for incoming messages.

import json
import time
from kafka import KafkaProducer, KafkaConsumer
import threading

LOG_FILE_PATH = 'log/message.log'
KAFKA_TOPIC = 'message'
KAFKA_BROKER = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def consume_messages():
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BROKER,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='log_debugger',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    for message in consumer:
        print(f"Consumed message: {message.value}")

consumer_thread = threading.Thread(target=consume_messages, daemon=True)
consumer_thread.start()

def process_log_file():
    with open(LOG_FILE_PATH, 'r') as file:
        file.seek(0, 2)
        # file.seek(0, 0)
        while True:
            line = file.readline()
            if line:
                if '{' in line and '}' in line:
                    try:
                        json_data = line[line.index('{'):line.rindex('}')+1] \
                            .replace('"', '\\\"').replace("{'", '{"').replace("': '", '": "').replace("'}", '"}').replace("', '", '", "')
                        json_object = json.loads(json_data)
                        producer.send(KAFKA_TOPIC, json_object)
                        print(f"Produced: {json_object}")
                    except json.JSONDecodeError as e:
                        print(f"Failed to parse JSON from line: {line}")
                        print(f"json data: {json_data}")
                        print(e)
            else:
                time.sleep(1)

if __name__ == "__main__":
    process_log_file()

Conclusion

This project successfully integrates Solar LLM, RAG, and Kafka to achieve scalable, distributed document processing. Kafka enables efficient parallel processing of document embeddings and query handling. By embedding documents and queries into a vector database, the chatbot offers reliable, context-driven responses. This distributed system enhances performance and scalability, ensuring dynamic real-time Q&A for large-scale document datasets.

Write a comment