-
Notifications
You must be signed in to change notification settings - Fork 0
/
model_handler.py
138 lines (113 loc) · 6.03 KB
/
model_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import torch
from transformers import AutoModelForMaskedLM, AutoModelForCausalLM, AutoTokenizer
import time
import logging
from colorama import Fore, Style
class CodeBERTPhiHandler:
def __init__(self, model_codebert="microsoft/codebert-base", model_phi="microsoft/phi-3.5-mini-instruct", timeout=300, debug=False):
self.codebert_model_name = model_codebert
self.phi_model_name = model_phi
self.timeout = timeout
self.debug = debug
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.cache_dir = os.path.expanduser('~/.cache/cpp-analyzer')
os.makedirs(self.cache_dir, exist_ok=True)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG if debug else logging.INFO)
def load_codebert_model(self):
self.logger.info("Loading CodeBERT model...")
self.codebert_tokenizer = AutoTokenizer.from_pretrained(self.codebert_model_name, cache_dir=self.cache_dir)
self.codebert_model = AutoModelForMaskedLM.from_pretrained(self.codebert_model_name, cache_dir=self.cache_dir).to(self.device)
self.logger.info("CodeBERT model loaded successfully")
def load_phi_model(self):
self.logger.info("Loading Phi model...")
self.phi_tokenizer = AutoTokenizer.from_pretrained(self.phi_model_name, cache_dir=self.cache_dir)
try:
self.phi_model = AutoModelForCausalLM.from_pretrained(
self.phi_model_name,
cache_dir=self.cache_dir,
torch_dtype=torch.float16,
device_map="auto"
)
except RuntimeError as e:
self.logger.warning(f"Failed to load Phi model on GPU: {e}")
self.logger.info("Attempting to load Phi model on CPU...")
self.phi_model = AutoModelForCausalLM.from_pretrained(
self.phi_model_name,
cache_dir=self.cache_dir,
device_map="cpu"
)
self.logger.info("Phi model loaded successfully")
def analyze_code_with_codebert(self, code):
self.logger.debug("Analyzing code with CodeBERT")
inputs = self.codebert_tokenizer(code, return_tensors="pt", truncation=True, max_length=512).to(self.device)
with torch.no_grad():
outputs = self.codebert_model(**inputs)
return outputs.logits.mean(dim=1)
def generate_combined_response(self, question, code_embeddings, code_content, static_analysis):
start_time = time.time()
self.logger.debug(f"Generating response for question: {question}")
try:
self.logger.debug("Generating query embeddings")
query_inputs = self.codebert_tokenizer(question, return_tensors="pt", truncation=True, max_length=128).to(self.device)
with torch.no_grad():
query_outputs = self.codebert_model(**query_inputs)
query_embeddings = query_outputs.logits.mean(dim=1)
self.logger.debug("Combining embeddings")
combined_embeddings = torch.cat((query_embeddings, code_embeddings), dim=1)
embedding_summary = f"Embedding shape: {combined_embeddings.shape}, Mean: {combined_embeddings.mean().item():.2f}, Max: {combined_embeddings.max().item():.2f}"
code_snippet = code_content[:500] + "..." if len(code_content) > 500 else code_content
static_analysis_summary = "\n".join([f"{key}: {value}" for key, value in static_analysis.items() if key != 'ast_graph'])
self.logger.debug("Preparing input for Phi model")
phi_input = f"""Analyze the following C++ code and answer the question:
C++ Code Snippet:
{code_snippet}
Static Analysis Summary:
{static_analysis_summary}
Code Embedding Summary:
{embedding_summary}
Question: {question}
Provide a detailed and well-structured answer based on the C++ code and analysis provided. Include information about classes, functions, member variables, and any potential code issues or optimizations. Format your response clearly and consistently, using appropriate headers and bullet points where necessary.
Answer:
"""
inputs = self.phi_tokenizer(phi_input, return_tensors="pt", truncation=True, max_length=2048)
inputs = {k: v.to(self.phi_model.device) for k, v in inputs.items()}
self.logger.debug("Generating response with Phi model")
self.phi_model.eval()
if torch.cuda.is_available():
torch.cuda.empty_cache()
with torch.no_grad():
outputs = self.phi_model.generate(
**inputs,
max_new_tokens=400, # Increased for more detailed responses
temperature=0.7,
do_sample=True,
num_return_sequences=1,
no_repeat_ngram_size=3
)
self.logger.debug("Decoding response")
response = self.phi_tokenizer.decode(outputs[0], skip_special_tokens=True)
response = response.split("Answer:")[-1].strip()
end_time = time.time()
self.logger.debug(f"Response generation completed in {end_time - start_time:.2f} seconds")
return self.format_response(response)
except Exception as e:
self.logger.error(f"Error in generate_combined_response: {str(e)}")
self.logger.exception("Detailed error information:")
raise
def format_response(self, response):
# Simple formatting to ensure consistent output
formatted_lines = []
current_section = ""
for line in response.split('\n'):
line = line.strip()
if line.endswith(':'):
current_section = line
formatted_lines.append(f"\n{line}")
elif line:
if current_section:
formatted_lines.append(f" - {line}")
else:
formatted_lines.append(f"- {line}")
return "\n".join(formatted_lines)