import os
import json
import torch
import random
import glob
from datetime import datetime
from tqdm import tqdm
from transformers import (
AutoTokenizer,
TrainingArguments,
Trainer,
TrainerCallback,
TrainingArguments,
TrainerState,
TrainerControl,
DataCollatorForLanguageModeling,
AutoModelForCausalLM,
BitsAndBytesConfig
)
import numpy as np
from peft import (
TaskType,
get_peft_model,
prepare_model_for_kbit_training,
LoraConfig,
)
from datasets import Dataset
def load_qa_data(results_dir):
"""
Load Q&A pairs from the processed results directory.
Args:
results_dir (str): Path to the directory containing JSON files with Q&A pairs
Returns:
list: List of prompt-completion pairs formatted for fine-tuning
"""
training_data = []
# Find all JSON files in the results directory and subdirectories
json_files = glob.glob(os.path.join(results_dir, "**/*.json"), recursive=True)
for json_file in tqdm(json_files, desc="Loading training data"):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract context and QA pairs
context = data.get("context", "")
qa_pairs = data.get("qa_pairs", [])
for qa_pair in qa_pairs:
question = qa_pair.get("question", "")
answer = qa_pair.get("answer", "")
if question and answer:
# Format as instruction-based fine-tuning example
prompt = f"""
Answer the following question based on the provided context.
CONTEXT:
{context}
QUESTION:
{question}
"""
# Add to training data
training_data.append({
"prompt": prompt,
"completion": answer
})
except Exception as e:
print(f"Error processing {json_file}: {e}")
continue
print(f"Loaded {len(training_data)} training examples")
return training_data
def prepare_dataset_for_training(training_data, tokenizer, max_length=2048):
"""
Prepare the dataset for training by tokenizing and formatting.
Args:
training_data (list): List of prompt-completion pairs
tokenizer: DeepSeek tokenizer
max_length (int): Maximum sequence length
Returns:
Dataset: HuggingFace dataset ready for training
"""
def tokenize_function(examples):
# Combine prompt and completion
full_texts = []
for i in range(len(examples["prompt"])):
full_text = examples["prompt"][i] + examples["completion"][i]
full_texts.append(full_text)
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
# Tokenize
tokenized = tokenizer(
full_texts,
padding="max_length",
truncation=True,
max_length=max_length,
return_tensors="pt"
)
# Create labels (same as input_ids for causal LM)
tokenized["labels"] = tokenized["input_ids"].clone()
# Create attention mask for prompt tokens
for i in range(len(full_texts)):
prompt_length = len(tokenizer.encode(examples["prompt"][i]))
# Set labels for prompt tokens to -100 (ignored in loss calculation)
tokenized["labels"][i, :prompt_length] = -100
return tokenized
# Convert to HuggingFace Dataset
dataset_dict = {
"prompt": [item["prompt"] for item in training_data],
"completion": [item["completion"] for item in training_data]
}
# Create dataset
dataset = Dataset.from_dict(dataset_dict)
# Apply tokenization
tokenized_dataset = dataset.map(
tokenize_function,
batched=True,
remove_columns=["prompt", "completion"]
)
return tokenized_dataset
def setup_lora_config():
"""
Configure LoRA hyperparameters.
Returns:
LoraConfig: Configuration for LoRA fine-tuning
"""
return LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=8, #16, # Rank dimension
lora_alpha=16, #32, # Alpha parameter for LoRA scaling
lora_dropout=0.05, # Dropout probability for LoRA layers
target_modules=[ # Target modules to apply LoRA to
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj"
],
bias="none", # Whether to train bias parameters
fan_in_fan_out=False # Set to True for linear layers with fan_in != fan_out
)
class EarlyStoppingCallback(TrainerCallback):
"""
Callback that implements early stopping.
"""
def __init__(self, patience=3, min_delta=0.0):
"""
Args:
patience (int): Number of evaluations with no improvement after which training will be stopped.
min_delta (float): Minimum change in the monitored quantity to qualify as an improvement.
"""
self.patience = patience
self.min_delta = min_delta
self.best_score = None
self.counter = 0
self.early_stop = False
def on_evaluate(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs):
eval_loss = state.log_history[-1].get("eval_loss")
if eval_loss is None:
return
if self.best_score is None:
self.best_score = eval_loss
elif eval_loss > self.best_score - self.min_delta:
self.counter += 1
print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
if self.counter >= self.patience:
print(f"Early stopping triggered! Best loss: {self.best_score:.4f}")
self.early_stop = True
control.should_training_stop = True
else:
self.best_score = eval_loss
self.counter = 0
return control
def train_with_lora(model_name, training_data, output_dir, batch_size=4, num_epochs=3, learning_rate=2e-4):
"""
Fine-tune the DeepSeek-R1 model with LoRA.
Args:
model_name (str): Name of the DeepSeek model
training_data (list): List of prompt-completion pairs
output_dir (str): Directory to save the fine-tuned model
batch_size (int): Training batch size
num_epochs (int): Number of training epochs
learning_rate (float): Learning rate
"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
)
model = AutoModelForCausalLM.from_pretrained(
model_name,
low_cpu_mem_usage=True,
trust_remote_code=True,
use_cache=False,
quantization_config=bnb_config,
device_map={"": torch.xpu.current_device()},
torch_dtype=torch.bfloat16,
)
# Prepare the model for LoRA fine-tuning
# 1. Prepare for k-bit training if using quantization
model = prepare_model_for_kbit_training(model)
# 2. Add LoRA adapters
lora_config = setup_lora_config()
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# Prepare dataset
tokenized_dataset = prepare_dataset_for_training(training_data, tokenizer)
# Split into train and evaluation sets
dataset_dict = tokenized_dataset.train_test_split(test_size=0.1)
# Set up training arguments
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=num_epochs,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
gradient_accumulation_steps=4,
evaluation_strategy="steps",
eval_steps=100, # More frequent evaluation to check stopping criteria
save_strategy="steps",
save_steps=500,
save_total_limit=3,
learning_rate=learning_rate,
warmup_steps=100,
weight_decay=0.01,
logging_dir=f"{output_dir}/logs",
logging_steps=100,
fp16=True,
report_to="none",
optim="adamw_torch",
load_best_model_at_end=True, # Load the best model when training ends
metric_for_best_model="eval_loss" # Use eval loss to determine the best model
)
# Create data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False # Use causal language modeling (not masked)
)
early_stopping_callback = EarlyStoppingCallback(patience=3, min_delta=0.01)
# Initialize trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset_dict["train"],
eval_dataset=dataset_dict["test"],
data_collator=data_collator,
callbacks=[early_stopping_callback] # Add the early stopping callback
)
# Start training
print("Starting LoRA fine-tuning...")
trainer.train()
model = trainer.model
print("LoRA fine-tuning complete...")
# Save the fine-tuned model
print("Saving model...")
model.save_pretrained(f"{output_dir}/final_model")
tokenizer.save_pretrained(f"{output_dir}/final_model")
print(f"Fine-tuned model saved to {output_dir}/final_model")
return model, tokenizer
# Main function
def main():
# Configuration
results_dir = "../results" # Directory with processed text files and QA pairs
model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
#model_name = "Intel/neural-chat-7b-v3-3"
output_dir = f"./fine_tuned"
# Load training data from processed results
training_data = load_qa_data(results_dir)
# Shuffle training data
random.shuffle(training_data)
# Fine-tune the model
train_with_lora(
model_name=model_name,
training_data=training_data,
output_dir=output_dir,
batch_size=1, # Can't fit more than one on the B580
num_epochs=10,
learning_rate=2e-4
)
model_path = f"{output_dir}/final_model"
# Load the fine-tuned model and tokenizer
print("Loading fine-tuned model for evaluation...")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
low_cpu_mem_usage=True,
trust_remote_code=True,
use_cache=True,
quantization_config=bnb_config,
device_map={"": torch.xpu.current_device()})
model = model.to('xpu')
print("Loaded. Ask your question, CTRL-C to exit.")
while True:
try:
question = input("> ").strip()
except KeyboardInterrupt:
print("\nExiting.")
break
prompt = f"""
You are an assistant providing resume details about James Ketrenos. Answer the following question based on your knowledge. If you don't know, say so. Be concise.
QUESTION:
{question}
"""
# Tokenize and generate
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=1024,
do_sample=True,
temperature=0.7,
top_p=0.9,
)
# Decode the output
full_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract just the answer part (remove the prompt)
answer = full_output#[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):]
print(answer.strip())
if __name__ == "__main__":
main()