Kai Izumoto
Update app.py
11b08f2 verified
# InfinateCodeGenerator - Ultimate Merged Edition (v1.0.1) - full script
"""
Consolidated, hardened, and production-ready version (patched call_model & retries).
"""
import os
import sys
import time
import json
import traceback
import uuid
import re
import subprocess
import shutil
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple, Generator
from datetime import datetime
import gradio as gr
from huggingface_hub import InferenceClient
# Added missing imports
import tempfile
import zipfile
# ---------- Config ----------
PYTHON_MODEL = "Nxcode-CQ-7B-orpo"
OTHER_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct"
FALLBACK_MODELS = [
"Qwen/Qwen2.5-Coder-32B-Instruct",
"Nxcode-CQ-7B-orpo",
"OpenCodeInterpreter-DS-33B"
]
DEFAULT_TEMPERATURE = 0.5
DEFAULT_TOP_P = 0.9
DEFAULT_MAX_TOKENS = 4096
DEFAULT_MAX_ITERS = 5
COMMAND_TIMEOUT = 60 # Increased timeout for sandboxed execution
ERROR_LOG_FILE = "/tmp/infgen_error.log"
# Enhanced evaluation weights
EVAL_WEIGHTS = {
"style": 0.20,
"security": 0.20,
"tests": 0.40,
"maintainability": 0.20
}
# Setup structured logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(ERROR_LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
# ---------- Helpers ----------
def sanitize_log_message(message: str) -> str:
"""Redacts sensitive information like API tokens from logs."""
# Regex for typical Hugging Face tokens (hf_...) and other patterns
token_pattern = re.compile(r"hf_[a-zA-Z0-9]{30,}")
return token_pattern.sub("[REDACTED_TOKEN]", message)
def write_error_log(exc: Exception, prefix: str = ""):
"""Writes a sanitized error log."""
tb = traceback.format_exc()
sanitized_tb = sanitize_log_message(tb)
sanitized_exc = sanitize_log_message(str(exc))
logging.error(f"{prefix} | Exception: {sanitized_exc}\nTraceback:\n{sanitized_tb}")
def get_token_from_env_or_manual(manual_token: Optional[str]) -> Optional[str]:
"""Retrieves HF token securely from manual input or environment variables."""
if manual_token and manual_token.strip():
return manual_token.strip()
return os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
def detect_language(goal: str, code: str) -> bool:
"""Detects whether the project is primarily Python.
Returns True for Python, False otherwise.
"""
combined = (goal + " " + (code or "")).lower()
python_kw = ["python", "django", "flask", "fastapi", "pytest", "def ", "import ", "pip"]
return any(kw in combined for kw in python_kw)
def run_cmd(cmd: List[str], cwd: Optional[str] = None, timeout: int = COMMAND_TIMEOUT) -> Tuple[int, str]:
"""Runs a command in a subprocess with a timeout and captures output."""
try:
# Sandboxed execution: disable network for pytest
env = os.environ.copy()
# If command references pytest explicitly, disable network for safety
if any("pytest" in str(part) for part in cmd):
env["ALLOW_NETWORK"] = "0"
proc = subprocess.run(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=timeout,
text=True,
check=False,
env=env
)
return proc.returncode, proc.stdout or ""
except subprocess.TimeoutExpired:
return 1, f"TIMEOUT: Command '{' '.join(cmd)}' exceeded {timeout} seconds."
except FileNotFoundError:
return 1, f"COMMAND NOT FOUND: {cmd[0]}"
except Exception as e:
write_error_log(e, "run_cmd failed")
return 1, f"ERROR: {e}"
def write_files(workdir: Path, files: Dict[str, str]) -> None:
"""Safely writes a dictionary of files to a specified directory, preventing path traversal."""
workdir_resolved = workdir.resolve()
for filename, content in files.items():
try:
# Prevent path traversal attacks
if ".." in filename or filename.startswith("/"):
logging.warning(f"Blocked malicious path attempt: {filename}")
continue
target_path = (workdir_resolved / filename).resolve()
# Final check to ensure the path is within the workdir
if workdir_resolved not in target_path.parents and target_path != workdir_resolved:
raise ValueError(f"Path traversal attempt blocked: {filename}")
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(content, encoding="utf-8")
except Exception as e:
write_error_log(e, f"Failed to write file {filename}")
def make_zip(dirpath: Path) -> Optional[str]:
"""Creates a zip archive of a directory."""
try:
base_path = str(dirpath.parent / dirpath.name)
return shutil.make_archive(base_path, 'zip', root_dir=dirpath)
except Exception as e:
write_error_log(e, "ZIP creation failed")
return None
# ---------- Model calls ----------
def extract_chunk_content(chunk: Any) -> Optional[str]:
"""Extracts content from various possible streaming chunk formats."""
try:
if isinstance(chunk, dict) and (choices := chunk.get("choices")):
# typical OpenAI-like streaming chunk shape
delta = choices[0].get("delta", {})
return delta.get("content") or delta.get("text")
# HF newer shapes may use 'generations' inside chunk
if isinstance(chunk, dict) and "generations" in chunk:
gens = chunk.get("generations") or []
parts = []
for g in gens:
if isinstance(g, dict) and "text" in g:
parts.append(g["text"])
return "".join(parts) if parts else None
# some streaming yields objects with .delta or .content attributes
if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'):
return chunk.delta.content
if isinstance(chunk, str):
return chunk
except Exception:
return None
return None
def call_model(client: InferenceClient, system: str, user: str, is_python: bool, **settings) -> str:
"""Calls the appropriate LLM with retry logic and multiple fallbacks.
Tries non-streaming first (more reliable), falls back to streaming.
"""
if client is None:
return "<<ERROR: No inference client provided>>"
primary_model = PYTHON_MODEL if is_python else OTHER_MODEL
models_to_try = [primary_model] + [m for m in FALLBACK_MODELS if m != primary_model]
logging.info(f"Calling model for {'Python' if is_python else 'Other'} project. Primary: {primary_model}")
logging.debug(f"Raw settings: {settings}")
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
# Build robust settings: include both keys some API variants accept
cleaned = {}
cleaned["temperature"] = settings.get("temperature", DEFAULT_TEMPERATURE)
cleaned["top_p"] = settings.get("top_p", DEFAULT_TOP_P)
max_new = settings.get("max_new_tokens", settings.get("max_tokens", DEFAULT_MAX_TOKENS))
try:
max_new = int(max_new)
except Exception:
max_new = DEFAULT_MAX_TOKENS
cleaned["max_new_tokens"] = max_new
# also include max_tokens for API variants
cleaned["max_tokens"] = max_new
logging.info(f"Using cleaned settings: temperature={cleaned['temperature']}, top_p={cleaned['top_p']}, max_new_tokens={cleaned['max_new_tokens']}")
last_exception = None
for model_name in models_to_try:
attempt = 0
# try a couple of times per model with decreasing tokens if necessary
while attempt < 3:
attempt += 1
try:
logging.info(f"Attempting non-streaming call to {model_name} (attempt {attempt})")
# Try named-argument style first (most robust)
try:
resp = client.chat_completion(messages=messages, model=model_name, stream=False, **cleaned)
except TypeError as te:
# Some client versions expect different parameter names - try a second shape
logging.debug(f"TypeError calling chat_completion: {te}")
try:
resp = client.chat_completion(messages=messages, model=model_name, **cleaned)
except Exception as e:
raise
except Exception as e:
# bubble up to outer exception handling
raise
response_text = ""
# Parse many possible shapes
try:
if isinstance(resp, dict):
# common HF shapes
if "generated_text" in resp and isinstance(resp["generated_text"], str):
response_text = resp["generated_text"]
elif "text" in resp and isinstance(resp["text"], str):
response_text = resp["text"]
elif "choices" in resp and resp["choices"]:
choice = resp["choices"][0]
if isinstance(choice, dict):
if "message" in choice and isinstance(choice["message"], dict):
response_text = choice["message"].get("content") or choice["message"].get("text", "") or ""
else:
response_text = choice.get("text") or choice.get("message") or ""
else:
response_text = str(choice)
elif "generations" in resp and resp["generations"]:
gens = resp["generations"]
parts = []
for g in gens:
if isinstance(g, dict) and "text" in g:
parts.append(g.get("text", ""))
elif hasattr(g, "text"):
parts.append(getattr(g, "text"))
response_text = "".join(parts)
else:
# fallback: inspect nested keys
if "data" in resp and isinstance(resp["data"], list) and resp["data"]:
# e.g., {'data':[{'text': '...'}]}
first = resp["data"][0]
if isinstance(first, dict) and "text" in first:
response_text = first["text"]
elif isinstance(resp, (list, tuple)):
# maybe list of generation dicts
parts = []
for item in resp:
if isinstance(item, dict) and "text" in item:
parts.append(item["text"])
else:
parts.append(str(item))
response_text = "".join(parts)
elif isinstance(resp, str):
response_text = resp
else:
# last resort: str()
response_text = str(resp)
except Exception as e:
write_error_log(e, f"Non-stream parsing failed for model {model_name}")
response_text = ""
if response_text and response_text.strip():
logging.info(f"✓ Successfully got response from {model_name} ({len(response_text)} chars)")
return response_text
else:
logging.warning(f"Non-streaming returned empty response from {model_name}, attempt {attempt}.")
# fall through to streaming fallback below
except Exception as e:
last_exception = e
write_error_log(e, f"Non-stream model {model_name} failed on attempt {attempt}")
logging.error(f"Non-stream error for {model_name}: {str(e)[:200]}")
# Streaming fallback
try:
logging.info(f"Attempting streaming call to {model_name} (attempt {attempt})")
# streaming - some versions yield objects, some strings
try:
stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True, **cleaned)
except TypeError:
# Try alternate call-signature
stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True)
except Exception as e:
raise
collected = []
try:
for chunk in stream_iter:
piece = extract_chunk_content(chunk)
if piece:
collected.append(piece)
response = "".join(collected).strip()
except Exception as e:
# some streaming iterables need to be exhausted differently; safely cast to string
write_error_log(e, "Streaming parsing failed")
response = ""
if response:
logging.info(f"✓ Successfully got streaming response from {model_name} ({len(response)} chars)")
return response
else:
logging.warning(f"Streaming returned empty response from {model_name} (attempt {attempt})")
except Exception as e:
last_exception = e
write_error_log(e, f"Streaming model {model_name} failed on attempt {attempt}")
logging.error(f"Streaming error for {model_name}: {str(e)[:200]}")
# reduce tokens and retry
time.sleep(1 + attempt * 0.5)
# reduce token budget to try avoid model refusing or failing
cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.5))
cleaned["max_tokens"] = cleaned["max_new_tokens"]
logging.info(f"Reduced max_new_tokens to {cleaned['max_new_tokens']} and retrying")
continue
# if reached here (no response), reduce tokens and retry
cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.6))
cleaned["max_tokens"] = cleaned["max_new_tokens"]
logging.info(f"No response; reduced max_new_tokens to {cleaned['max_new_tokens']} and will retry (attempt {attempt})")
time.sleep(0.8 + attempt * 0.3)
logging.error(f"❌ ALL MODELS FAILED. Last error: {last_exception}")
return f"<<ERROR: All models failed. Last error: {sanitize_log_message(str(last_exception))}>>"
# ---------- Robust parsing ----------
def validate_files_dict(files: Dict[str, str]) -> bool:
"""Validates that the generated files dictionary is well-formed."""
if not isinstance(files, dict) or not files:
return False
return all(isinstance(k, str) and isinstance(v, str) for k, v in files.items())
def parse_meta(text: str) -> Optional[Dict[str, Any]]:
"""Parses model output to extract code files, trying structured JSON first, then falling back to heuristics."""
if not text or not isinstance(text, str):
return None
# Strict JSON/META block parsing
for pattern in [r"```json\s*(.*?)```", r"```meta\s*(.*?)```", r"```META\s*(.*?)```", r"<META>(.*?)</META>"]:
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
try:
content = match.group(1).strip()
parsed = json.loads(content)
if "files" in parsed and validate_files_dict(parsed["files"]):
logging.info(f"Successfully parsed META JSON with {len(parsed['files'])} files")
return parsed
except (json.JSONDecodeError, TypeError) as e:
logging.warning(f"JSON parse failed: {e}")
continue
# Also try to detect a top-level JSON blob
try:
parsed_full = json.loads(text.strip())
if isinstance(parsed_full, dict) and "files" in parsed_full and validate_files_dict(parsed_full["files"]):
logging.info("Parsed raw JSON response as META")
return parsed_full
except Exception:
pass
# Fallback to heuristic parsing of code blocks
files = {}
# Try to find filename markers before code blocks
filename_patterns = [
r'#\s*[Ff]ile:\s*([\w/._-]+\.[\w]+)',
r'##\s*([\w/._-]+\.[\w]+)',
r'\*\*\s*([\w/._-]+\.[\w]+)\s*\*\*',
r'^\s*([\w\-/_.]+?\.(?:py|txt|md|json|yaml|yml))\s*:\s*$', # e.g., "main.py:" on its own line
]
all_filenames = []
for pattern in filename_patterns:
all_filenames.extend(re.findall(pattern, text, flags=re.MULTILINE))
# Grab all fenced code blocks
code_blocks = re.findall(r"```(?:[\w+-]+)?\s*([\s\S]*?)```", text, re.DOTALL)
# Also capture indented/code-block-like sections (fallback)
if not code_blocks:
# naive: split by two or more newlines and keep blocks that look like code
chunks = [c for c in re.split(r"\n{2,}", text) if len(c.splitlines()) > 1]
code_blocks = chunks[:6] # limit
if not code_blocks:
logging.warning("No code blocks found in model response")
return None
# Match filenames with code blocks
for i, block in enumerate(code_blocks):
block_content = block.strip()
if not block_content:
continue
if i < len(all_filenames):
filename = all_filenames[i]
else:
# Guess filename based on content
if "def test_" in block_content or "import pytest" in block_content:
filename = f"tests/test_main.py" if not block_content.startswith("test_") else f"{block_content.splitlines()[0][:50]}.py"
elif "requirements" in text.lower() and i == 0:
filename = "requirements.txt"
elif "# README" in block_content or block_content.startswith("# ") or block_content.lower().strip().startswith("readme"):
filename = "README.md"
else:
filename = f"main.py" if i == 0 else f"file_{i}.py"
# ensure relative path safe
files[filename] = block_content
if validate_files_dict(files) and files:
logging.info(f"Heuristic parsing extracted {len(files)} files: {list(files.keys())}")
return {"files": files, "changelog": "Extracted files via heuristic parsing."}
# As a last resort, if the whole output looks like a single file, place it into main.py
if text.strip():
files = {"main.py": text.strip()}
if validate_files_dict(files):
logging.info("Parsed whole response into main.py as last resort")
return {"files": files, "changelog": "Fallback single-file parse."}
logging.error("Failed to extract any valid files from model response")
return None
# ---------- Enhanced evaluators ----------
def run_evaluators(workdir: Path) -> Dict[str, Any]:
out = {}
rc, txt = run_cmd([sys.executable, "-m", "flake8", ".", "--count", "--max-line-length=100"], cwd=str(workdir))
out["flake8_pass"] = rc == 0
out["flake8_out"] = txt
rc, txt = run_cmd([sys.executable, "-m", "bandit", "-r", ".", "-f", "txt"], cwd=str(workdir))
out["bandit_pass"] = rc == 0 or "No issues" in txt
out["bandit_out"] = txt
test_files = list(workdir.glob("**/test_*.py")) + list(workdir.glob("**/*_test.py"))
if test_files:
rc, txt = run_cmd([sys.executable, "-m", "pytest", "--maxfail=1", "--tb=short"], cwd=str(workdir))
out["pytest_pass"] = rc == 0
else:
out["pytest_pass"] = False
out["pytest_out"] = txt if test_files else "No tests"
rc, txt = run_cmd([sys.executable, "-m", "black", "--check", "."], cwd=str(workdir))
out["black_pass"] = rc == 0
complexity = 5.0
rc, txt = run_cmd([sys.executable, "-m", "radon", "cc", ".", "-s", "-a"], cwd=str(workdir))
if rc == 0:
m = re.search(r"Average complexity.*?([0-9.]+)", txt)
if m:
try:
complexity = float(m.group(1))
except:
pass
out["complexity"] = complexity
# Calculate weighted score
style = 100.0 if (out["flake8_pass"] and out["black_pass"]) else 50.0
security = 100.0 if out["bandit_pass"] else 30.0
tests = 100.0 if out["pytest_pass"] else 20.0
maintainability = max(0.0, 100.0 - (complexity - 5.0) * 10.0) if complexity > 5 else 100.0
w = EVAL_WEIGHTS
score = w["style"] * style + w["security"] * security + w["tests"] * tests + w["maintainability"] * maintainability
out["quality_score"] = round(max(0.0, min(100.0, score)), 1)
out["breakdown"] = {
"style": round(style, 1),
"security": round(security, 1),
"tests": round(tests, 1),
"maintainability": round(maintainability, 1)
}
return out
# ---------- AI features ----------
def generate_code_review(client: Optional[InferenceClient], token: str, files: Dict[str, str], eval_results: Dict, is_python: bool) -> str:
preview = "\n".join([f"# {n}\n{c[:300]}..." for n, c in list(files.items())[:2]])
prompt = f"""Review this code:
{preview}
Quality: Flake8={'Pass' if eval_results.get('flake8_pass') else 'Fail'}, Tests={'Pass' if eval_results.get('pytest_pass') else 'Fail'}
Give 2-3 specific, actionable improvements:"""
review = call_model(client, "You are a senior code reviewer.", prompt, is_python, max_new_tokens=400, temperature=0.2, top_p=0.8)
return review if review and "<<ERROR" not in review else "No review"
def generate_readme(client: Optional[InferenceClient], token: str, goal: str, files: Dict[str, str], is_python: bool) -> str:
summary = "\n".join([f"- {n}: {len(c.splitlines())} lines" for n, c in files.items()])
prompt = f"""Create README.md:
Goal: {goal}
Files:
{summary}
Include: description, installation, usage."""
readme = call_model(client, "You are a technical writer.", prompt, is_python, max_new_tokens=600, temperature=0.2, top_p=0.9)
return readme if readme and "<<ERROR" not in readme else "# Project\n\nGenerated code."
def create_initial_scaffold(client: Optional[InferenceClient], token: str, goal: str, is_python: bool) -> Optional[Dict[str, Any]]:
system = """You are a Principal Software Architect. Create a professional initial project scaffold with working code, requirements.txt, and tests."""
prompt = f"""Project: {goal}
Create Version 0.1 scaffold:
1. Choose appropriate libraries (requirements.txt)
2. Working minimal code
3. Basic tests in tests/
Return as META JSON with files mapping."""
try:
response = call_model(client, system, prompt, is_python, max_new_tokens=3072, temperature=0.4)
if response and "<<ERROR" not in response:
meta = parse_meta(response)
if meta and meta.get("files") and validate_files_dict(meta["files"]):
return meta
else:
# Save raw scaffold response for debugging
with open("/tmp/failed_scaffold_response.txt", "w") as f:
f.write(response)
except Exception as e:
write_error_log(e, "Scaffold failed")
return None
def import_project(zip_file) -> Dict[str, str]:
if not zip_file:
return {}
try:
files = {}
with zipfile.ZipFile(zip_file.name, 'r') as zf:
for filename in zf.namelist():
if filename.endswith(('.py', '.txt', '.md', '.json', '.yaml', '.yml')):
try:
content = zf.read(filename).decode('utf-8')
files[filename] = content
except:
pass
return files
except Exception as e:
write_error_log(e, "Import failed")
return {}
# ---------- Controller ----------
class CodeGenController:
def __init__(self, token: str, goal: str, instructions: str, settings: Dict, max_iters: int, infinite_mode: bool, is_python: bool):
self.token = token
try:
self.client = InferenceClient(token=token)
logging.info("✓ InferenceClient initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize InferenceClient: {e}")
raise
self.goal = goal
self.instructions = instructions
self.settings = settings
self.max_iters = max_iters
self.infinite_mode = infinite_mode
self.is_python = is_python
self.model_name = PYTHON_MODEL if is_python else OTHER_MODEL
logging.info(f"Controller initialized for {'Python' if is_python else 'Other'} with model: {self.model_name}")
self.history: List[Dict] = []
self.current_files: Dict[str, str] = {}
self.current_code: str = ""
self.best_score: float = 0.0
self.best_eval: Dict = {}
self.best_files: Dict[str, str] = {}
self.best_workspace: str = ""
self.best_zip: Optional[str] = None
self.best_review: str = ""
self.best_readme: str = ""
self.stop_flag = Path(tempfile.gettempdir()) / f"stop_{uuid.uuid4().hex[:8]}"
def cleanup_workdir(self, workdir: Path):
try:
if workdir.exists():
shutil.rmtree(workdir)
except Exception as e:
write_error_log(e, f"Failed to cleanup workdir {workdir}")
def start_scaffolding(self) -> bool:
scaffold = create_initial_scaffold(self.client, self.token, self.goal, self.is_python)
if scaffold and scaffold.get("files"):
self.current_files = scaffold["files"]
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
self.best_files = dict(self.current_files)
# Ensure we have at least requirements.txt
if "requirements.txt" not in self.best_files:
self.best_files["requirements.txt"] = "# Add your requirements here"
return True
# Better defaults if scaffolding fails
self.current_files = {
"main.py": "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()",
"requirements.txt": "# Add your requirements here"
}
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
self.best_files = dict(self.current_files)
return False
def perform_iteration(self, iteration: int) -> Dict[str, Any]:
parent = Path(tempfile.mkdtemp(prefix="infgen_"))
workdir = parent / f"iter_{iteration}_{uuid.uuid4().hex[:6]}"
workdir.mkdir(parents=True, exist_ok=True)
try:
system = """You are a Level 5 Principal Software Engineer specializing in production-ready code.
Follow Defensive Programming, TDD, and best practices.
Output MUST be flawless, well-tested, and industry-standard.
ALWAYS return complete code in META JSON format."""
feedback = ""
if self.best_eval:
score = self.best_eval.get("quality_score", 0)
feedback = f"\n\nPREVIOUS SCORE: {score}/100"
if not self.best_eval.get("pytest_pass"):
feedback += "\nCRITICAL: TESTS FAILING. Fix logic to pass all tests."
if not self.best_eval.get("flake8_pass"):
feedback += "\nFIX: Code quality issues (flake8)."
prompt = f"""PROJECT: {self.goal}
CURRENT CODE (BUILD UPON THIS):
{self.current_code}
INSTRUCTIONS: {self.instructions}{feedback}
CRITICAL RULES:
1. Every function needs docstrings and type hints.
2. Write comprehensive pytest tests.
3. Complete implementations - NO placeholders.
4. Return all files in META JSON format.
Return the perfected code in META format."""
# Attempt the model call, with extra retry attempts and reduced token fallback
response = call_model(self.client, system, prompt, self.is_python, **self.settings)
if not response or "<<ERROR" in response:
logging.error(f"Model returned error or empty: {response[:200]}")
# Save response for debugging if available
with open(f"/tmp/failed_response_{iteration}.txt", "w") as f:
f.write(response or "<<EMPTY RESPONSE>>")
# Try one conservative retry with reduced token budget before failing
logging.info("Attempting a conservative retry with reduced tokens...")
conservative_settings = dict(self.settings)
conservative_settings["max_new_tokens"] = min(1024, int(conservative_settings.get("max_new_tokens", 1024)))
conservative_settings["temperature"] = min(0.3, float(conservative_settings.get("temperature", 0.3)))
response_retry = call_model(self.client, system, prompt, self.is_python, **conservative_settings)
if response_retry and "<<ERROR" not in response_retry:
response = response_retry
else:
# Still failed — produce a safe fallback scaffold so iteration does not fail
logging.warning(f"Conservative retry failed for iteration {iteration}; producing fallback scaffold.")
fallback_main = self.current_files.get("main.py", "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()")
fallback_main += "\n\n# NOTE: Model failed to generate new code for this iteration. Fallback scaffold inserted."
fallback_test = (
"import pytest\n\n"
"def test_placeholder():\n"
" \"\"\"Placeholder test created because model failed to produce output.\"\"\"\n"
" assert True\n"
)
fallback_requirements = self.current_files.get("requirements.txt", "# Add your requirements here")
files = {
"main.py": fallback_main,
"tests/test_main.py": fallback_test,
"requirements.txt": fallback_requirements
}
# Write files and run evaluators so the pipeline can continue
write_files(workdir, files)
eval_results = run_evaluators(workdir)
eval_results["fallback_used"] = True
eval_results["fallback_info"] = f"Model failed; fallback scaffold used for iteration {iteration}."
review = f"Fallback scaffold inserted because model failed to return usable output. See /tmp/failed_response_{iteration}.txt for raw model response."
readme = f"# Fallback Project\n\nThis scaffold was inserted automatically because model generation failed on iteration {iteration}."
files["README.md"] = readme
write_files(workdir, {"README.md": readme})
zip_path = make_zip(workdir)
return {
"success": True,
"eval": eval_results,
"zip": zip_path,
"workdir": str(workdir),
"files": files,
"review": review,
"readme": readme
}
meta = parse_meta(response)
if not meta or not meta.get("files"):
logging.error(f"Parse failed. Response preview: {response[:1000]}")
# Save failed response for debugging
with open(f"/tmp/failed_response_{iteration}.txt", "w") as f:
f.write(response)
return {"success": False, "warning": f"Parse failed - see /tmp/failed_response_{iteration}.txt"}
files = meta["files"]
write_files(workdir, files)
eval_results = run_evaluators(workdir)
review = generate_code_review(self.client, self.token, files, eval_results, self.is_python)
readme = generate_readme(self.client, self.token, self.goal, files, self.is_python)
files["README.md"] = readme
write_files(workdir, {"README.md": readme})
zip_path = make_zip(workdir)
return {
"success": True, "eval": eval_results, "zip": zip_path, "workdir": str(workdir),
"files": files, "review": review, "readme": readme
}
except Exception as e:
write_error_log(e, "Iteration exception")
return {"success": False, "warning": f"Exception: {str(e)}"}
def run_loop(self) -> Generator:
iteration = 1
max_iterations = 999999 if self.infinite_mode else self.max_iters
if not self.current_files:
self.start_scaffolding()
initial_state = {"stop_flag_path": str(self.stop_flag)}
yield self.format_output(f"Starting with {self.model_name}...", iteration, max_iterations, initial_state)
while iteration <= max_iterations:
if self.stop_flag.exists():
try:
self.stop_flag.unlink()
logging.info("Stop flag detected - stopping generation")
except OSError:
pass
yield self.format_output("⛔ Stopped by user", iteration, max_iterations)
break
yield self.format_output(f"🔄 Iteration {iteration}/{max_iterations} running...", iteration, max_iterations)
result = self.perform_iteration(iteration)
if not result.get("success"):
warning_msg = result.get("warning", "Unknown iteration error")
logging.warning(f"Iteration {iteration} failed: {warning_msg}")
# CRITICAL: Still yield output so UI updates and shows the previous best code
yield self.format_output(f"⚠️ Iteration {iteration} failed: {warning_msg}", iteration, max_iterations)
time.sleep(2) # Give more time before retry
iteration += 1
continue
eval_res = result.get("eval", {})
score = eval_res.get("quality_score", 0)
self.history.append({"iteration": iteration, "eval": eval_res})
self.current_files = result["files"]
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
if score > self.best_score:
if self.best_workspace:
self.cleanup_workdir(Path(self.best_workspace))
self.best_score = score
self.best_eval = eval_res
self.best_files = dict(result["files"])
self.best_workspace = result.get("workdir", "")
self.best_zip = result.get("zip")
self.best_review = result.get("review", "")
self.best_readme = result.get("readme", "")
logging.info(f"New best score: {score}/100")
else:
# Even if score didn't improve, still update current_files for next iteration
logging.info(f"Score {score}/100 - keeping best: {self.best_score}/100")
if result.get("workdir") and result.get("workdir") != self.best_workspace:
self.cleanup_workdir(Path(result["workdir"]))
yield self.format_output(f"Iteration {iteration} complete: {score}/100", iteration, max_iterations)
iteration += 1
time.sleep(0.3)
yield self.format_output(f"Complete! Best: {self.best_score}/100", iteration - 1, max_iterations)
def format_output(self, log_msg: str, iteration: int, max_iters: int, state: Optional[Dict] = None):
progress = f"Iteration {iteration}/{max_iters if max_iters < 999999 else 'INF'}" if iteration <= max_iters else "Complete"
main = self.best_files.get("main.py", "# Generating code...")
test = next((v for k, v in self.best_files.items() if 'test' in k and k.endswith('.py')), "# No tests yet...")
req = self.best_files.get("requirements.txt", "# No requirements yet...")
readme = self.best_files.get("README.md", "# Generating README...")
other = {k: v for k, v in self.best_files.items() if k not in [
"main.py", next((k for k in self.best_files if 'test' in k and k.endswith('.py')), None),
"requirements.txt", "README.md"
]}
return (
f"[{time.strftime('%X')}] {sanitize_log_message(log_msg)}", self.model_name, progress,
generate_metrics_html(self.history), self.best_eval, main, test, req, readme, other,
self.best_review, self.best_zip, self.best_workspace, state or {}
)
# ---------- UI Helpers ----------
def generate_metrics_html(history: List[Dict]) -> str:
if not history:
return "<div style='padding:12px'>No metrics yet</div>"
html_parts = ["<div style='font-family:sans-serif'><h4>Quality Trend</h4><div style='background:#f8f9fa;padding:12px;border-radius:8px'>"]
for h in history[-10:]:
score = h.get("eval", {}).get("quality_score", 0)
width = int(score * 2.5)
color = "#10b981" if score >= 80 else "#f59e0b" if score >= 60 else "#ef4444"
html_parts.append(f"<div style='margin:4px 0'>#{h.get('iteration')}: <div style='display:inline-block;width:{width}px;height:20px;background:{color};border-radius:4px'></div> {score}/100</div>")
scores = [h.get("eval", {}).get("quality_score", 0) for h in history]
avg = sum(scores) / len(scores) if scores else 0
best = max(scores) if scores else 0
html_parts.append(f"<div style='margin-top:12px'><strong>Avg:</strong> {avg:.1f} | <strong>Best:</strong> {best:.1f}</div></div></div>")
return "".join(html_parts)
# ---------- UI ----------
def create_ui():
with gr.Blocks(title="InfinateCodeGenerator Ultimate", theme=gr.themes.Soft()) as demo:
gr.Markdown("# InfinateCodeGenerator - Ultimate Merged Edition\n*Controller architecture • Smart models • Multi-file UI • Never stops early*")
controller_state = gr.State({})
with gr.Row():
with gr.Column(scale=2):
project_goal = gr.Textbox(label="Project Goal", lines=4, placeholder="E.g., Create a FastAPI endpoint for user authentication with MongoDB")
with gr.Tabs():
with gr.TabItem("New Project"):
initial_code = gr.Textbox(label="Starting Code (optional)", lines=8)
with gr.TabItem("Import Project"):
import_zip = gr.File(label="Upload ZIP", file_types=[".zip"])
import_btn = gr.Button("Import Files")
import_status = gr.Textbox(label="Status", interactive=False)
improve_instructions = gr.Textbox(label="Instructions", lines=3, value="Write comprehensive tests, add type hints, and improve error handling.")
hf_token_manual = gr.Textbox(label="HF Token (optional, overrides env)", type="password")
with gr.Row():
start_btn = gr.Button("Start Generation", variant="primary", size="lg")
stop_btn = gr.Button("STOP", variant="stop", size="lg")
with gr.Accordion("Settings", open=False):
infinite_mode = gr.Checkbox(label="Infinite Mode", value=False)
max_iters = gr.Slider(1, 15, value=5, step=1, label="Max Iterations")
temperature = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Temperature")
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top P")
max_tokens = gr.Slider(512, 4096, value=4096, step=512, label="Max Tokens")
with gr.Column(scale=3):
with gr.Tabs():
with gr.TabItem("Dashboard"):
model_display = gr.Textbox(label="Active Model", interactive=False)
progress_display = gr.Textbox(label="Progress", interactive=False)
metrics_html = gr.HTML()
run_log = gr.Textbox(label="Log", lines=10, interactive=False)
with gr.TabItem("Generated Files"):
with gr.Tabs():
with gr.TabItem("main.py"):
main_file = gr.Code(language="python", lines=20)
with gr.TabItem("tests/test_main.py"):
test_file = gr.Code(language="python", lines=15)
with gr.TabItem("requirements.txt"):
req_file = gr.Textbox(lines=10, interactive=False, show_label=False)
with gr.TabItem("README.md"):
readme_file = gr.Code(language="markdown", lines=15)
with gr.TabItem("Other Files"):
other_files = gr.JSON()
with gr.TabItem("Code Review"):
review_display = gr.Textbox(label="AI Review", lines=10, interactive=False)
with gr.TabItem("Evaluation"):
eval_json = gr.JSON()
with gr.TabItem("Download"):
workspace_path = gr.Textbox(label="Best Version Path", interactive=False)
download_zip = gr.File(label="Download ZIP")
def import_project_files(zip_file):
files = import_project(zip_file)
if files:
combined = "\n\n".join([f"# {name}\n{content}" for name, content in files.items()])
return f"Imported {len(files)} files!", combined
return "Import failed", ""
import_btn.click(fn=import_project_files, inputs=[import_zip], outputs=[import_status, initial_code])
def start_gen(goal, init_code, instructions, hf_tok, inf_mode, max_it, temp, top, max_tok):
token = get_token_from_env_or_manual(hf_tok)
if not token:
error_msg = "ERROR: No HF token found. Please provide a Hugging Face token."
logging.error(error_msg)
yield (error_msg, "", "", "", {}, "", "", "", "", {}, "", None, "", {})
return
logging.info(f"Starting generation with token: {token[:10]}... (length: {len(token)})")
settings = {"temperature": temp, "top_p": top, "max_new_tokens": max_tok}
is_python_project = detect_language(goal, init_code)
logging.info(f"Detected project type: {'Python' if is_python_project else 'Other'}")
controller = CodeGenController(token, goal, instructions, settings, int(max_it), inf_mode, is_python_project)
if init_code and init_code.strip():
controller.current_files = {"main.py": init_code}
controller.current_code = init_code
logging.info("Using provided initial code")
yield from controller.run_loop()
def set_stop(controller_state_val):
if controller_state_val and (stop_path_str := controller_state_val.get("stop_flag_path")):
stop_path = Path(stop_path_str)
try:
stop_path.touch()
logging.info(f"Stop flag created at {stop_path}")
return "⛔ Stop signal sent! Will stop after current iteration completes..."
except Exception as e:
logging.error(f"Failed to create stop flag: {e}")
return f"❌ Failed to stop: {e}"
return "❌ Could not stop: No active process found. Start generation first."
outputs = [
run_log, model_display, progress_display, metrics_html, eval_json,
main_file, test_file, req_file, readme_file, other_files,
review_display, download_zip, workspace_path, controller_state
]
start_btn.click(
fn=start_gen,
inputs=[project_goal, initial_code, improve_instructions, hf_token_manual, infinite_mode, max_iters, temperature, top_p, max_tokens],
outputs=outputs
)
stop_btn.click(fn=set_stop, inputs=[controller_state], outputs=[run_log])
return demo
if __name__ == "__main__":
try:
demo = create_ui()
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
except Exception as e:
print(f"Failed to launch Gradio app: {e}", file=sys.stderr)
sys.exit(1)