Enhancing Large-Scale Text Processing Efficiency with MapReduce and Small-Scale LLM: A Practical Case Study on a Question-Answering Task
At A Glance.
This article aims to explore how to combine two technologies to create an efficient and high-performance text processing system:
Background and idea
In the fields of data science and artificial intelligence, the rapid and effective processing of large-scale text data has consistently been a focus of attention. MapReduce, a distributed computing framework introduced by Google, allows the decomposition of large-scale data processing tasks into multiple independent sub-tasks, which can be executed in parallel across several machines to achieve efficient data processing. This technology finds widespread application in areas such as search engine indexing, log analysis, and big data processing, with commonly used frameworks including Apache Hadoop and Apache Spark.
Traditionally, these frameworks have been paired with conventional machine learning models, which are valued for their swift processing capabilities. However, these traditional models have notable limitations when it comes to handling complex natural language tasks. Methods based on Bag of Words or TF-IDF, for example, often rely on statistical features to process text data. While these approaches might perform adequately in simpler text classification tasks, they frequently fall short in more demanding tasks that require a deep semantic understanding. For instance, traditional models often struggle to capture the true meaning of text when dealing with synonyms or contextually related vocabulary. Moreover, these models perform poorly in tasks that involve lengthy texts or require the recognition of relationships across sentences—such as topic modeling, sentiment analysis, or entity extraction—due to their lack of deep contextual comprehension. This shortfall makes it challenging to meet the accuracy and efficiency standards demanded by modern applications.
In contrast, small-scale LLMs have the ability to utilize contextual information to more accurately understand and manage these complex tasks, showing exceptional performance particularly in areas like text comprehension, content extraction, and automatic tagging. For example, in text comprehension, small-scale LLMs can accurately extract key information through semantic analysis and context understanding. In content extraction, they can autonomously identify useful entities and relationships from unstructured data. When it comes to tagging tasks, LLMs can intelligently label large-scale text data based on context. In question-answering scenarios, LLMs can provide logically answers by integrating context with their reasoning capabilities. These impressive performances prompt us to explore how we can combine the intelligent reasoning of LLMs with the parallel processing strength of MapReduce. By doing so, we aim to resolve the tension between efficiency and performance in large-scale text processing.
Practical Case Study: Combining PySpark with a Small-Scale LLM for Text-Based Question Answering on Harry Potter
Parallel Processing for QA Tasks
This example demonstrates how to use PySpark for performing Question-Answering (QA) tasks on the "Harry Potter" dataset. The complete code can be found here:LLM&PySpark
Installation and Dataset Loading
import os
os.system('pip install pyspark numpy pandas together tiktoken transformers datasets')
# Load HarryPotter dataset
from datasets import load_dataset
dataset_path = 'elricwan/HarryPotter'
dataset = load_dataset(dataset_path)
Text Chunking by Paragraph
A function chunk_text_by_paragraph is defined to split the dataset's content into smaller chunks based on the number of tokens. This function ensures that each chunk contains a manageable number of tokens, facilitating parallel processing by the QA model.
Recommended by LinkedIn
import tiktoken
import re
def chunk_text_by_paragraph(text, min_tokens=500, max_tokens=1000, encoding_name='cl100k_base'):
enc = tiktoken.get_encoding(encoding_name)
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for paragraph in paragraphs:
if not is_valid_sentence(paragraph):
continue
paragraph_tokens = len(enc.encode(paragraph))
if current_tokens + paragraph_tokens > max_tokens:
if current_tokens >= min_tokens:
chunks.append('\n\n'.join(current_chunk))
current_chunk = [paragraph]
current_tokens = paragraph_tokens
else:
current_chunk.append(paragraph)
current_tokens += paragraph_tokens
else:
current_chunk.append(paragraph)
current_tokens += paragraph_tokens
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
Defining the QA Function
The QA function, generate_response, is defined to generate answers based on the provided document and question. It uses the Together AI API to interact with a google gemma-2 9b model.
from getpass import getpass
import os
from together import Together
api_key = getpass("Enter your Together AI API key: ")
client = Together(api_key=api_key)
question = "This is from Harry Potter Book, now given the context, how many wizard fight appeared in the context? please summary all the fight between wizard"
def generate_response(document):
prompt = f"""
---Role---
You are an advanced AI assistant tasked with creating comprehensive answers by integrating and expanding upon the partial insights provided by multiple analysts.
Each analyst has access to only a subset of the relevant documents or facts, resulting in incomplete individual conclusions.
---Guidelines---
1. Synthesize these partial answers, identifying common threads and complementary information.
2. Fill in gaps by inferring logical connections between the partial insights.
3. Provide a broader, more complete perspective that goes beyond simply summarizing the analysts' views.
4. Generate new insights or hypotheses based on the combined information.
5. Highlight any potential inconsistencies or areas of uncertainty in the integrated view.
6. Present a coherent, well-rounded answer that captures the full scope of the available information.
---Input---
Here is the partial insights provided by multiple analysts:
{document}
Here is the Question:
{question}
Now Please Response:
"""
response = client.chat.completions.create(
model="google/gemma-2-9b-it",
messages=[{"role": "system", "content": prompt},{"role": "user", "content": "Now Please Respond:"}],
max_tokens=2000,
temperature=0.1,
)
answer = response.choices[0].message.content
return answer
Parallel Processing with PySpark
The code uses PySpark to parallelize the QA task across multiple documents. A Spark session is initialized, and a user-defined function (UDF) ask_question is applied to each document chunk. Note that you need to run the code on local machine instead of goole Colab for full parallelize. And the limit rate of together ai api needs to increase.
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark = SparkSession.builder \
.appName("Parallel LLM Question Answering") \
.getOrCreate()
@pandas_udf("string", PandasUDFType.SCALAR)
def ask_question(document_series):
return document_series.apply(generate_response)
# Convert the list of documents to a PySpark DataFrame
df = spark.createDataFrame([(text,) for text in documents], ["chunk"])
# Use the ask_question function in parallel
df_answers = df.withColumn("answer", ask_question(df["chunk"]))
Aggregating Results
The results are saved to disk and later aggregated to generate a comprehensive answer. The aggregate function synthesizes partial insights from multiple documents into a complete answer. Note I use CONTINUE_PROMPT for aggregate function to consider overlooked perspectives and implications.
# save the answer to disk
df_answers.write.mode("overwrite").json("/content/df_answers")
# Collect all rows into a list
rows = df_read.collect()
points = []
# Print the 'answer' column for each row
for row in rows:
if "don't know" not in row['answer']:
points.append(row['answer'])
def aggregate(document, question):
prompt = f"""
---Role---
You are an advanced AI assistant tasked with creating comprehensive answers by integrating and expanding upon the partial insights provided by multiple analysts.
Each analyst has access to only a subset of the relevant documents or facts, resulting in incomplete individual conclusions.
---Guidelines---
1. Synthesize these partial answers, identifying common threads and complementary information.
2. Fill in gaps by inferring logical connections between the partial insights.
3. Provide a broader, more complete perspective that goes beyond simply summarizing the analysts' views.
4. Generate new insights or hypotheses based on the combined information.
5. Highlight any potential inconsistencies or areas of uncertainty in the integrated view.
6. Present a coherent, well-rounded answer that captures the full scope of the available information.
---Input---
Here is the partial insights provided by multiple analysts:
{document}
Here is the Question:
{question}
Now Please Response:
"""
response = client.chat.completions.create(
model="google/gemma-2-9b-it",
messages=[{"role": "system", "content": prompt},{"role": "user", "content": "Now Please Respond:"}],
max_tokens=2000,
temperature=0.1,
)
answer = response.choices[0].message.content
CONTINUE_PROMPT = """
Expand your analysis. Consider overlooked perspectives and implications to provide a more comprehensive response:
"""
new_prompt = prompt + '\n' + answer + '\n' + CONTINUE_PROMPT
response = client.chat.completions.create(
model="google/gemma-2-9b-it",
messages=[{"role": "system", "content": prompt},{"role": "user", "content": "Now Please Respond:"}],
max_tokens=2000,
temperature=0.1,
)
new_answer = response.choices[0].message.content
return new_answer
Perspectives = ' '.join(points)
final_answer = aggregate(Perspectives, question)
print(final_answer)
Summary.
In this case study, we successfully demonstrated the potential of combining MapReduce with a small-scale LLM. The entire system not only achieved efficient parallel processing for large-scale text-based question answering tasks but also generated high-quality answers. Moreover, the application of combining MapReduce with small-scale LLMs extends far beyond text-based question answering. This approach can be effectively applied in document filtering, document analysis, and any other domain where LLMs can be utilized. By replacing traditional machine learning models with LLMs, the quality of various tasks can be significantly improved.
By integrating LLMs into various tasks and decision-making processes, we are stepping into a new era of intelligence. In this era, artificial intelligence will become an indispensable tool, helping us to tackle challenges, seize opportunities, and drive industries forward.
Last
Regarding the question-answering task, there's one final point I'd like to add: asking a good question is often the foundation for receiving a quality answer. People are not naturally skilled at asking questions; the art of questioning is a skill that requires continuous cultivation and practice. Therefore, when we build question-answering systems or conduct data analysis, we should not only focus on optimizing models and algorithms but also place great importance on the process of question design. By enhancing our ability to ask the right questions, we can more effectively unlock the value of data and obtain more insightful answers.