360 lines
12 KiB
Python
360 lines
12 KiB
Python
# Self-supervised synthetic data via sequential generation
|
|
import os
|
|
import re
|
|
import json
|
|
import torch
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from tqdm import tqdm
|
|
from transformers import (
|
|
AutoTokenizer,
|
|
AutoModelForCausalLM,
|
|
BitsAndBytesConfig,
|
|
)
|
|
|
|
def load_deepseek_r1():
|
|
"""
|
|
Loads the DeepSeek-R1 model and tokenizer.
|
|
|
|
Returns:
|
|
tuple: (model, tokenizer) for DeepSeek-R1
|
|
"""
|
|
# Load model and tokenizer
|
|
model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
|
|
|
|
# Load model and tokenizer
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
|
|
|
bnb_config = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.bfloat16,
|
|
)
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_name,
|
|
low_cpu_mem_usage=True,
|
|
trust_remote_code=True,
|
|
use_cache=False,
|
|
quantization_config=bnb_config,
|
|
device_map={"": torch.xpu.current_device()},
|
|
torch_dtype=torch.bfloat16,
|
|
)
|
|
if not model.config.pad_token_id:
|
|
model.config.pad_token_id = model.config.eos_token_id
|
|
|
|
return model, tokenizer
|
|
|
|
# Function to get answers from the model
|
|
def validate_question_answer_from_model(context, question, answer, model, tokenizer):
|
|
"""
|
|
Query the model to evaluate if an answer is a good fit for a question.
|
|
|
|
Args:
|
|
context (str): The text file content
|
|
question (str): The question
|
|
answer (str): The answer
|
|
model: The DeepSeek model
|
|
tokenizer: The DeepSeek tokenizer
|
|
|
|
Returns:
|
|
str: The model's evaluation to the question
|
|
"""
|
|
# Set up the prompt for answering the question
|
|
prompt = f"""<task>
|
|
You are a quality assurance expert reviewing question-answer pairs for an AI training dataset. Your task is to evaluate whether each pair meets our quality standards and is suitable for training.
|
|
|
|
For each question-answer pair, evaluate:
|
|
|
|
1. ACCURACY: Does the answer contain ONLY information from the context, without fabrications?
|
|
2. COMPLETENESS: Does the answer fully address the question using all relevant information?
|
|
3. RELEVANCE: Is the question meaningful and relevant to the context?
|
|
4. NATURALNESS: Do both question and answer sound natural and conversational?
|
|
5. DIVERSITY: Does this pair add variety to our dataset (not redundant with others)?
|
|
|
|
Context:
|
|
{context}
|
|
|
|
Question:
|
|
{question}
|
|
|
|
Answer:
|
|
{answer}
|
|
|
|
Provide your assessment as follows:
|
|
- VERDICT: [ACCEPT/REJECT]
|
|
- REASONING: [Brief explanation of your decision]
|
|
- IMPROVEMENTS: [Suggestions if needed]
|
|
</task>
|
|
"""
|
|
|
|
# Generate answer
|
|
raw_answer = generate_with_deepseek(prompt, model, tokenizer)
|
|
|
|
# Parse the response to get the actual answer
|
|
answer = parse_deepseek_response(raw_answer)
|
|
|
|
return answer
|
|
|
|
|
|
def generate_with_deepseek(prompt, model, tokenizer, max_length=4096):
|
|
"""
|
|
Generate text using DeepSeek-R1 model with proper handling of full output.
|
|
|
|
Args:
|
|
prompt (str): The input prompt
|
|
model: The DeepSeek model
|
|
tokenizer: The DeepSeek tokenizer
|
|
max_length (int): Maximum length of generated text
|
|
|
|
Returns:
|
|
str: Generated text response
|
|
"""
|
|
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
|
|
|
|
with torch.no_grad():
|
|
# Get the full generated sequence
|
|
generation_output = model.generate(
|
|
**inputs,
|
|
pad_token_id=tokenizer.eos_token_id,
|
|
max_new_tokens=max_length,
|
|
do_sample=True,
|
|
temperature=0.7,
|
|
top_p=0.9,
|
|
return_dict_in_generate=True,
|
|
output_scores=False
|
|
)
|
|
|
|
# Get the full output token IDs
|
|
output_token_ids = generation_output.sequences[0]
|
|
|
|
# Decode the full generated text (including the prompt)
|
|
full_output = tokenizer.decode(output_token_ids, skip_special_tokens=True)
|
|
|
|
# Extract only the response part by removing the prompt part
|
|
response = full_output[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):]
|
|
|
|
return response
|
|
|
|
def parse_deepseek_response(response):
|
|
"""
|
|
Parse the DeepSeek-R1 response.
|
|
This handles DeepSeek's thinking steps and separates them from the response.
|
|
|
|
Args:
|
|
response (str): The raw response from the DeepSeek model
|
|
|
|
Returns:
|
|
str: The cleaned answer part of the response
|
|
"""
|
|
# If the response has thinking tags, extract the actual answer
|
|
response = re.sub(r"^(<think>)?.*</think>", "", response, flags=re.DOTALL)
|
|
|
|
# If no special formatting detected, return the whole response
|
|
return response.strip()
|
|
|
|
# Function to recursively walk a directory and process text files
|
|
def process_directory(directory_path, output_path="results", file_extensions=(".txt",".md"), batch_size=5):
|
|
"""
|
|
Recursively walks a directory, processes text files, and stores results.
|
|
|
|
Args:
|
|
directory_path (str): Path to directory containing text files
|
|
output_path (str): Path to store results
|
|
file_extensions (tuple): File extensions to process
|
|
batch_size (int): Number of files to process before clearing cache
|
|
"""
|
|
# Load the DeepSeek-R1 model
|
|
model, tokenizer = load_deepseek_r1()
|
|
|
|
# Create output directory if it doesn't exist
|
|
os.makedirs(output_path, exist_ok=True)
|
|
|
|
# Use tqdm for progress tracking
|
|
file_paths = []
|
|
for root, _, files in os.walk(directory_path):
|
|
for file in files:
|
|
if file.lower().endswith(file_extensions):
|
|
file_paths.append(os.path.join(root, file))
|
|
|
|
# Process files with batch-based memory management
|
|
for i, file_path in enumerate(tqdm(file_paths, desc="Processing files")):
|
|
process_file(file_path, model, tokenizer, output_path)
|
|
|
|
# Clear cache periodically to prevent memory issues
|
|
if (i + 1) % batch_size == 0:
|
|
if torch.xpu.is_available():
|
|
torch.xpu.empty_cache()
|
|
else:
|
|
torch.cuda.empty_cache()
|
|
|
|
print(f"Processing complete. Results stored in {output_path}")
|
|
|
|
# Function to process a single text file
|
|
def process_file(file_path, model, tokenizer, output_path):
|
|
"""
|
|
Process a single text file by querying the model for questions and answers.
|
|
|
|
Args:
|
|
file_path (str): Path to the text file
|
|
model: The DeepSeek model
|
|
tokenizer: The DeepSeek tokenizer
|
|
output_path (str): Path to store results
|
|
"""
|
|
# Read the file content
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
return
|
|
|
|
# Skip empty files
|
|
if not content.strip():
|
|
print(f"Skipping empty file: {file_path}")
|
|
return
|
|
|
|
# Generate a relative output path that maintains the directory structure
|
|
relative_path = os.path.relpath(file_path, start=os.path.dirname(output_path))
|
|
output_file = os.path.join(output_path, relative_path + ".json")
|
|
os.makedirs(os.path.dirname(output_file), exist_ok=True)
|
|
|
|
# Handle potential token length issues by truncating if necessary
|
|
# DeepSeek has a context window limit, truncate if needed
|
|
max_content_length = 100000 # Adjust based on model's context length limit
|
|
if len(content) > max_content_length:
|
|
content = content[:max_content_length] + "... [Content truncated due to length]"
|
|
|
|
# Query model for questions
|
|
questions = get_questions_from_model(content, model, tokenizer)
|
|
print(f"{len(questions)} questions generated for {file_path}")
|
|
|
|
# Get answers for each question
|
|
results = {
|
|
"file_path": file_path,
|
|
"processed_at": datetime.now().isoformat(),
|
|
"context": content[:1000] + "..." if len(content) > 1000 else content, # Truncated context for JSON storage
|
|
"qa_pairs": []
|
|
}
|
|
|
|
# Process each question
|
|
for i, question in enumerate(questions):
|
|
print(f"Generating answer for question {i+1}/{len(questions)}: {question}")
|
|
answer = get_answer_from_model(content, question, model, tokenizer)
|
|
print(f"Answer: {answer[:50] + '...' if len(answer) > 50 else answer}")
|
|
print(f"Evaluating response...")
|
|
validation = validate_question_answer_from_model(content, question, answer, model, tokenizer)
|
|
print(f"Evaluation: {validation[:50] + '...' if len(validation) > 50 else validation}")
|
|
results["qa_pairs"].append({
|
|
"question": question,
|
|
"answer": answer,
|
|
"validation": validation,
|
|
})
|
|
# Save results after each generate pass so they can be evaluated
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
|
|
|
# Function to get questions from the model
|
|
def get_questions_from_model(context, model, tokenizer):
|
|
"""
|
|
Query the model to generate questions about the provided context.
|
|
|
|
Args:
|
|
context (str): The text file content
|
|
model: The DeepSeek model
|
|
tokenizer: The DeepSeek tokenizer
|
|
|
|
Returns:
|
|
list: List of questions about the context
|
|
"""
|
|
# Set up the prompt for generating questions
|
|
prompt = f"""<task>
|
|
You are an expert data scientist creating a training dataset. I'll provide context information about a person from their resume. Your task is to generate 10 diverse, realistic questions that someone might ask about this person.
|
|
|
|
IMPORTANT: DO NOT return JSON or any structured format. Respond with a simple numbered list of questions only, with no formatting, no JSON, and no additional text.
|
|
|
|
Generate questions that:
|
|
- Vary in complexity (simple factual questions, complex reasoning questions)
|
|
- Cover different aspects of the context (experience, skills, education, achievements)
|
|
- Include both specific and general inquiries
|
|
- Sound natural, as if asked by a real person
|
|
- Avoid asking for information not present in the context
|
|
|
|
Context:
|
|
{context}
|
|
|
|
Return ONLY a plain numbered list like:
|
|
1. First question?
|
|
2. Second question?
|
|
...and so on.
|
|
|
|
Do not include any explanations, JSON, or other formatting.
|
|
</task>
|
|
"""
|
|
|
|
# Generate questions
|
|
raw_response = generate_with_deepseek(prompt, model, tokenizer)
|
|
|
|
# Parse the response to get the actual questions
|
|
response = parse_deepseek_response(raw_response)
|
|
|
|
lines = response.strip().split('\n')
|
|
|
|
# Transform lines like "1. What is your name?" into just "What is your name?" (only include lines that end with a question mark)
|
|
questions = [re.sub(r'^\d+\.\s*', '', line.strip()) for line in lines if line.strip() and line.strip().endswith('?')]
|
|
if len(questions) == 0:
|
|
print(response)
|
|
exit(0)
|
|
return questions
|
|
|
|
# Function to get answers from the model
|
|
def get_answer_from_model(context, question, model, tokenizer):
|
|
"""
|
|
Query the model to answer a question about the provided context.
|
|
|
|
Args:
|
|
context (str): The text file content
|
|
question (str): The question to answer
|
|
model: The DeepSeek model
|
|
tokenizer: The DeepSeek tokenizer
|
|
|
|
Returns:
|
|
str: The model's answer to the question
|
|
"""
|
|
# Set up the prompt for answering the question
|
|
prompt = f"""<task>
|
|
You are an AI assistant being fine-tuned to accurately represent a specific person based on their resume. Below is a question about this person and the context from their resume.
|
|
|
|
Your task is to provide a comprehensive, accurate answer that:
|
|
- Only uses information explicitly stated in the context
|
|
- Doesn't fabricate or assume additional details
|
|
- Maintains a professional, helpful tone
|
|
- Clearly states if the question cannot be answered based on the given context
|
|
- Structures the response in a natural, conversational way
|
|
|
|
Context:
|
|
{context}
|
|
|
|
Question:
|
|
{question}
|
|
|
|
Answer:
|
|
</task>
|
|
"""
|
|
|
|
# Generate answer
|
|
raw_answer = generate_with_deepseek(prompt, model, tokenizer)
|
|
|
|
# Parse the response to get the actual answer
|
|
answer = parse_deepseek_response(raw_answer)
|
|
|
|
return answer
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
process_directory(
|
|
directory_path="../doc/", # Replace with your directory path
|
|
output_path="../results",
|
|
file_extensions=(".txt", ".md"), # Process both txt and md files
|
|
batch_size=5 # Clear cache after every 5 files
|
|
) |