# InfinateCodeGenerator - Ultimate Merged Edition (v1.0.1) - full script """ Consolidated, hardened, and production-ready version (patched call_model & retries). """ import os import sys import time import json import traceback import uuid import re import subprocess import shutil import logging from pathlib import Path from typing import Optional, Dict, Any, List, Tuple, Generator from datetime import datetime import gradio as gr from huggingface_hub import InferenceClient # Added missing imports import tempfile import zipfile # ---------- Config ---------- PYTHON_MODEL = "Nxcode-CQ-7B-orpo" OTHER_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" FALLBACK_MODELS = [ "Qwen/Qwen2.5-Coder-32B-Instruct", "Nxcode-CQ-7B-orpo", "OpenCodeInterpreter-DS-33B" ] DEFAULT_TEMPERATURE = 0.5 DEFAULT_TOP_P = 0.9 DEFAULT_MAX_TOKENS = 4096 DEFAULT_MAX_ITERS = 5 COMMAND_TIMEOUT = 60 # Increased timeout for sandboxed execution ERROR_LOG_FILE = "/tmp/infgen_error.log" # Enhanced evaluation weights EVAL_WEIGHTS = { "style": 0.20, "security": 0.20, "tests": 0.40, "maintainability": 0.20 } # Setup structured logging logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(ERROR_LOG_FILE), logging.StreamHandler(sys.stdout) ] ) # ---------- Helpers ---------- def sanitize_log_message(message: str) -> str: """Redacts sensitive information like API tokens from logs.""" # Regex for typical Hugging Face tokens (hf_...) and other patterns token_pattern = re.compile(r"hf_[a-zA-Z0-9]{30,}") return token_pattern.sub("[REDACTED_TOKEN]", message) def write_error_log(exc: Exception, prefix: str = ""): """Writes a sanitized error log.""" tb = traceback.format_exc() sanitized_tb = sanitize_log_message(tb) sanitized_exc = sanitize_log_message(str(exc)) logging.error(f"{prefix} | Exception: {sanitized_exc}\nTraceback:\n{sanitized_tb}") def get_token_from_env_or_manual(manual_token: Optional[str]) -> Optional[str]: """Retrieves HF token securely from manual input or environment variables.""" if manual_token and manual_token.strip(): return manual_token.strip() return os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") def detect_language(goal: str, code: str) -> bool: """Detects whether the project is primarily Python. Returns True for Python, False otherwise. """ combined = (goal + " " + (code or "")).lower() python_kw = ["python", "django", "flask", "fastapi", "pytest", "def ", "import ", "pip"] return any(kw in combined for kw in python_kw) def run_cmd(cmd: List[str], cwd: Optional[str] = None, timeout: int = COMMAND_TIMEOUT) -> Tuple[int, str]: """Runs a command in a subprocess with a timeout and captures output.""" try: # Sandboxed execution: disable network for pytest env = os.environ.copy() # If command references pytest explicitly, disable network for safety if any("pytest" in str(part) for part in cmd): env["ALLOW_NETWORK"] = "0" proc = subprocess.run( cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout, text=True, check=False, env=env ) return proc.returncode, proc.stdout or "" except subprocess.TimeoutExpired: return 1, f"TIMEOUT: Command '{' '.join(cmd)}' exceeded {timeout} seconds." except FileNotFoundError: return 1, f"COMMAND NOT FOUND: {cmd[0]}" except Exception as e: write_error_log(e, "run_cmd failed") return 1, f"ERROR: {e}" def write_files(workdir: Path, files: Dict[str, str]) -> None: """Safely writes a dictionary of files to a specified directory, preventing path traversal.""" workdir_resolved = workdir.resolve() for filename, content in files.items(): try: # Prevent path traversal attacks if ".." in filename or filename.startswith("/"): logging.warning(f"Blocked malicious path attempt: {filename}") continue target_path = (workdir_resolved / filename).resolve() # Final check to ensure the path is within the workdir if workdir_resolved not in target_path.parents and target_path != workdir_resolved: raise ValueError(f"Path traversal attempt blocked: {filename}") target_path.parent.mkdir(parents=True, exist_ok=True) target_path.write_text(content, encoding="utf-8") except Exception as e: write_error_log(e, f"Failed to write file {filename}") def make_zip(dirpath: Path) -> Optional[str]: """Creates a zip archive of a directory.""" try: base_path = str(dirpath.parent / dirpath.name) return shutil.make_archive(base_path, 'zip', root_dir=dirpath) except Exception as e: write_error_log(e, "ZIP creation failed") return None # ---------- Model calls ---------- def extract_chunk_content(chunk: Any) -> Optional[str]: """Extracts content from various possible streaming chunk formats.""" try: if isinstance(chunk, dict) and (choices := chunk.get("choices")): # typical OpenAI-like streaming chunk shape delta = choices[0].get("delta", {}) return delta.get("content") or delta.get("text") # HF newer shapes may use 'generations' inside chunk if isinstance(chunk, dict) and "generations" in chunk: gens = chunk.get("generations") or [] parts = [] for g in gens: if isinstance(g, dict) and "text" in g: parts.append(g["text"]) return "".join(parts) if parts else None # some streaming yields objects with .delta or .content attributes if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'): return chunk.delta.content if isinstance(chunk, str): return chunk except Exception: return None return None def call_model(client: InferenceClient, system: str, user: str, is_python: bool, **settings) -> str: """Calls the appropriate LLM with retry logic and multiple fallbacks. Tries non-streaming first (more reliable), falls back to streaming. """ if client is None: return "<>" primary_model = PYTHON_MODEL if is_python else OTHER_MODEL models_to_try = [primary_model] + [m for m in FALLBACK_MODELS if m != primary_model] logging.info(f"Calling model for {'Python' if is_python else 'Other'} project. Primary: {primary_model}") logging.debug(f"Raw settings: {settings}") messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] # Build robust settings: include both keys some API variants accept cleaned = {} cleaned["temperature"] = settings.get("temperature", DEFAULT_TEMPERATURE) cleaned["top_p"] = settings.get("top_p", DEFAULT_TOP_P) max_new = settings.get("max_new_tokens", settings.get("max_tokens", DEFAULT_MAX_TOKENS)) try: max_new = int(max_new) except Exception: max_new = DEFAULT_MAX_TOKENS cleaned["max_new_tokens"] = max_new # also include max_tokens for API variants cleaned["max_tokens"] = max_new logging.info(f"Using cleaned settings: temperature={cleaned['temperature']}, top_p={cleaned['top_p']}, max_new_tokens={cleaned['max_new_tokens']}") last_exception = None for model_name in models_to_try: attempt = 0 # try a couple of times per model with decreasing tokens if necessary while attempt < 3: attempt += 1 try: logging.info(f"Attempting non-streaming call to {model_name} (attempt {attempt})") # Try named-argument style first (most robust) try: resp = client.chat_completion(messages=messages, model=model_name, stream=False, **cleaned) except TypeError as te: # Some client versions expect different parameter names - try a second shape logging.debug(f"TypeError calling chat_completion: {te}") try: resp = client.chat_completion(messages=messages, model=model_name, **cleaned) except Exception as e: raise except Exception as e: # bubble up to outer exception handling raise response_text = "" # Parse many possible shapes try: if isinstance(resp, dict): # common HF shapes if "generated_text" in resp and isinstance(resp["generated_text"], str): response_text = resp["generated_text"] elif "text" in resp and isinstance(resp["text"], str): response_text = resp["text"] elif "choices" in resp and resp["choices"]: choice = resp["choices"][0] if isinstance(choice, dict): if "message" in choice and isinstance(choice["message"], dict): response_text = choice["message"].get("content") or choice["message"].get("text", "") or "" else: response_text = choice.get("text") or choice.get("message") or "" else: response_text = str(choice) elif "generations" in resp and resp["generations"]: gens = resp["generations"] parts = [] for g in gens: if isinstance(g, dict) and "text" in g: parts.append(g.get("text", "")) elif hasattr(g, "text"): parts.append(getattr(g, "text")) response_text = "".join(parts) else: # fallback: inspect nested keys if "data" in resp and isinstance(resp["data"], list) and resp["data"]: # e.g., {'data':[{'text': '...'}]} first = resp["data"][0] if isinstance(first, dict) and "text" in first: response_text = first["text"] elif isinstance(resp, (list, tuple)): # maybe list of generation dicts parts = [] for item in resp: if isinstance(item, dict) and "text" in item: parts.append(item["text"]) else: parts.append(str(item)) response_text = "".join(parts) elif isinstance(resp, str): response_text = resp else: # last resort: str() response_text = str(resp) except Exception as e: write_error_log(e, f"Non-stream parsing failed for model {model_name}") response_text = "" if response_text and response_text.strip(): logging.info(f"✓ Successfully got response from {model_name} ({len(response_text)} chars)") return response_text else: logging.warning(f"Non-streaming returned empty response from {model_name}, attempt {attempt}.") # fall through to streaming fallback below except Exception as e: last_exception = e write_error_log(e, f"Non-stream model {model_name} failed on attempt {attempt}") logging.error(f"Non-stream error for {model_name}: {str(e)[:200]}") # Streaming fallback try: logging.info(f"Attempting streaming call to {model_name} (attempt {attempt})") # streaming - some versions yield objects, some strings try: stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True, **cleaned) except TypeError: # Try alternate call-signature stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True) except Exception as e: raise collected = [] try: for chunk in stream_iter: piece = extract_chunk_content(chunk) if piece: collected.append(piece) response = "".join(collected).strip() except Exception as e: # some streaming iterables need to be exhausted differently; safely cast to string write_error_log(e, "Streaming parsing failed") response = "" if response: logging.info(f"✓ Successfully got streaming response from {model_name} ({len(response)} chars)") return response else: logging.warning(f"Streaming returned empty response from {model_name} (attempt {attempt})") except Exception as e: last_exception = e write_error_log(e, f"Streaming model {model_name} failed on attempt {attempt}") logging.error(f"Streaming error for {model_name}: {str(e)[:200]}") # reduce tokens and retry time.sleep(1 + attempt * 0.5) # reduce token budget to try avoid model refusing or failing cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.5)) cleaned["max_tokens"] = cleaned["max_new_tokens"] logging.info(f"Reduced max_new_tokens to {cleaned['max_new_tokens']} and retrying") continue # if reached here (no response), reduce tokens and retry cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.6)) cleaned["max_tokens"] = cleaned["max_new_tokens"] logging.info(f"No response; reduced max_new_tokens to {cleaned['max_new_tokens']} and will retry (attempt {attempt})") time.sleep(0.8 + attempt * 0.3) logging.error(f"❌ ALL MODELS FAILED. Last error: {last_exception}") return f"<>" # ---------- Robust parsing ---------- def validate_files_dict(files: Dict[str, str]) -> bool: """Validates that the generated files dictionary is well-formed.""" if not isinstance(files, dict) or not files: return False return all(isinstance(k, str) and isinstance(v, str) for k, v in files.items()) def parse_meta(text: str) -> Optional[Dict[str, Any]]: """Parses model output to extract code files, trying structured JSON first, then falling back to heuristics.""" if not text or not isinstance(text, str): return None # Strict JSON/META block parsing for pattern in [r"```json\s*(.*?)```", r"```meta\s*(.*?)```", r"```META\s*(.*?)```", r"(.*?)"]: match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: try: content = match.group(1).strip() parsed = json.loads(content) if "files" in parsed and validate_files_dict(parsed["files"]): logging.info(f"Successfully parsed META JSON with {len(parsed['files'])} files") return parsed except (json.JSONDecodeError, TypeError) as e: logging.warning(f"JSON parse failed: {e}") continue # Also try to detect a top-level JSON blob try: parsed_full = json.loads(text.strip()) if isinstance(parsed_full, dict) and "files" in parsed_full and validate_files_dict(parsed_full["files"]): logging.info("Parsed raw JSON response as META") return parsed_full except Exception: pass # Fallback to heuristic parsing of code blocks files = {} # Try to find filename markers before code blocks filename_patterns = [ r'#\s*[Ff]ile:\s*([\w/._-]+\.[\w]+)', r'##\s*([\w/._-]+\.[\w]+)', r'\*\*\s*([\w/._-]+\.[\w]+)\s*\*\*', r'^\s*([\w\-/_.]+?\.(?:py|txt|md|json|yaml|yml))\s*:\s*$', # e.g., "main.py:" on its own line ] all_filenames = [] for pattern in filename_patterns: all_filenames.extend(re.findall(pattern, text, flags=re.MULTILINE)) # Grab all fenced code blocks code_blocks = re.findall(r"```(?:[\w+-]+)?\s*([\s\S]*?)```", text, re.DOTALL) # Also capture indented/code-block-like sections (fallback) if not code_blocks: # naive: split by two or more newlines and keep blocks that look like code chunks = [c for c in re.split(r"\n{2,}", text) if len(c.splitlines()) > 1] code_blocks = chunks[:6] # limit if not code_blocks: logging.warning("No code blocks found in model response") return None # Match filenames with code blocks for i, block in enumerate(code_blocks): block_content = block.strip() if not block_content: continue if i < len(all_filenames): filename = all_filenames[i] else: # Guess filename based on content if "def test_" in block_content or "import pytest" in block_content: filename = f"tests/test_main.py" if not block_content.startswith("test_") else f"{block_content.splitlines()[0][:50]}.py" elif "requirements" in text.lower() and i == 0: filename = "requirements.txt" elif "# README" in block_content or block_content.startswith("# ") or block_content.lower().strip().startswith("readme"): filename = "README.md" else: filename = f"main.py" if i == 0 else f"file_{i}.py" # ensure relative path safe files[filename] = block_content if validate_files_dict(files) and files: logging.info(f"Heuristic parsing extracted {len(files)} files: {list(files.keys())}") return {"files": files, "changelog": "Extracted files via heuristic parsing."} # As a last resort, if the whole output looks like a single file, place it into main.py if text.strip(): files = {"main.py": text.strip()} if validate_files_dict(files): logging.info("Parsed whole response into main.py as last resort") return {"files": files, "changelog": "Fallback single-file parse."} logging.error("Failed to extract any valid files from model response") return None # ---------- Enhanced evaluators ---------- def run_evaluators(workdir: Path) -> Dict[str, Any]: out = {} rc, txt = run_cmd([sys.executable, "-m", "flake8", ".", "--count", "--max-line-length=100"], cwd=str(workdir)) out["flake8_pass"] = rc == 0 out["flake8_out"] = txt rc, txt = run_cmd([sys.executable, "-m", "bandit", "-r", ".", "-f", "txt"], cwd=str(workdir)) out["bandit_pass"] = rc == 0 or "No issues" in txt out["bandit_out"] = txt test_files = list(workdir.glob("**/test_*.py")) + list(workdir.glob("**/*_test.py")) if test_files: rc, txt = run_cmd([sys.executable, "-m", "pytest", "--maxfail=1", "--tb=short"], cwd=str(workdir)) out["pytest_pass"] = rc == 0 else: out["pytest_pass"] = False out["pytest_out"] = txt if test_files else "No tests" rc, txt = run_cmd([sys.executable, "-m", "black", "--check", "."], cwd=str(workdir)) out["black_pass"] = rc == 0 complexity = 5.0 rc, txt = run_cmd([sys.executable, "-m", "radon", "cc", ".", "-s", "-a"], cwd=str(workdir)) if rc == 0: m = re.search(r"Average complexity.*?([0-9.]+)", txt) if m: try: complexity = float(m.group(1)) except: pass out["complexity"] = complexity # Calculate weighted score style = 100.0 if (out["flake8_pass"] and out["black_pass"]) else 50.0 security = 100.0 if out["bandit_pass"] else 30.0 tests = 100.0 if out["pytest_pass"] else 20.0 maintainability = max(0.0, 100.0 - (complexity - 5.0) * 10.0) if complexity > 5 else 100.0 w = EVAL_WEIGHTS score = w["style"] * style + w["security"] * security + w["tests"] * tests + w["maintainability"] * maintainability out["quality_score"] = round(max(0.0, min(100.0, score)), 1) out["breakdown"] = { "style": round(style, 1), "security": round(security, 1), "tests": round(tests, 1), "maintainability": round(maintainability, 1) } return out # ---------- AI features ---------- def generate_code_review(client: Optional[InferenceClient], token: str, files: Dict[str, str], eval_results: Dict, is_python: bool) -> str: preview = "\n".join([f"# {n}\n{c[:300]}..." for n, c in list(files.items())[:2]]) prompt = f"""Review this code: {preview} Quality: Flake8={'Pass' if eval_results.get('flake8_pass') else 'Fail'}, Tests={'Pass' if eval_results.get('pytest_pass') else 'Fail'} Give 2-3 specific, actionable improvements:""" review = call_model(client, "You are a senior code reviewer.", prompt, is_python, max_new_tokens=400, temperature=0.2, top_p=0.8) return review if review and "< str: summary = "\n".join([f"- {n}: {len(c.splitlines())} lines" for n, c in files.items()]) prompt = f"""Create README.md: Goal: {goal} Files: {summary} Include: description, installation, usage.""" readme = call_model(client, "You are a technical writer.", prompt, is_python, max_new_tokens=600, temperature=0.2, top_p=0.9) return readme if readme and "< Optional[Dict[str, Any]]: system = """You are a Principal Software Architect. Create a professional initial project scaffold with working code, requirements.txt, and tests.""" prompt = f"""Project: {goal} Create Version 0.1 scaffold: 1. Choose appropriate libraries (requirements.txt) 2. Working minimal code 3. Basic tests in tests/ Return as META JSON with files mapping.""" try: response = call_model(client, system, prompt, is_python, max_new_tokens=3072, temperature=0.4) if response and "< Dict[str, str]: if not zip_file: return {} try: files = {} with zipfile.ZipFile(zip_file.name, 'r') as zf: for filename in zf.namelist(): if filename.endswith(('.py', '.txt', '.md', '.json', '.yaml', '.yml')): try: content = zf.read(filename).decode('utf-8') files[filename] = content except: pass return files except Exception as e: write_error_log(e, "Import failed") return {} # ---------- Controller ---------- class CodeGenController: def __init__(self, token: str, goal: str, instructions: str, settings: Dict, max_iters: int, infinite_mode: bool, is_python: bool): self.token = token try: self.client = InferenceClient(token=token) logging.info("✓ InferenceClient initialized successfully") except Exception as e: logging.error(f"Failed to initialize InferenceClient: {e}") raise self.goal = goal self.instructions = instructions self.settings = settings self.max_iters = max_iters self.infinite_mode = infinite_mode self.is_python = is_python self.model_name = PYTHON_MODEL if is_python else OTHER_MODEL logging.info(f"Controller initialized for {'Python' if is_python else 'Other'} with model: {self.model_name}") self.history: List[Dict] = [] self.current_files: Dict[str, str] = {} self.current_code: str = "" self.best_score: float = 0.0 self.best_eval: Dict = {} self.best_files: Dict[str, str] = {} self.best_workspace: str = "" self.best_zip: Optional[str] = None self.best_review: str = "" self.best_readme: str = "" self.stop_flag = Path(tempfile.gettempdir()) / f"stop_{uuid.uuid4().hex[:8]}" def cleanup_workdir(self, workdir: Path): try: if workdir.exists(): shutil.rmtree(workdir) except Exception as e: write_error_log(e, f"Failed to cleanup workdir {workdir}") def start_scaffolding(self) -> bool: scaffold = create_initial_scaffold(self.client, self.token, self.goal, self.is_python) if scaffold and scaffold.get("files"): self.current_files = scaffold["files"] self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) self.best_files = dict(self.current_files) # Ensure we have at least requirements.txt if "requirements.txt" not in self.best_files: self.best_files["requirements.txt"] = "# Add your requirements here" return True # Better defaults if scaffolding fails self.current_files = { "main.py": "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()", "requirements.txt": "# Add your requirements here" } self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) self.best_files = dict(self.current_files) return False def perform_iteration(self, iteration: int) -> Dict[str, Any]: parent = Path(tempfile.mkdtemp(prefix="infgen_")) workdir = parent / f"iter_{iteration}_{uuid.uuid4().hex[:6]}" workdir.mkdir(parents=True, exist_ok=True) try: system = """You are a Level 5 Principal Software Engineer specializing in production-ready code. Follow Defensive Programming, TDD, and best practices. Output MUST be flawless, well-tested, and industry-standard. ALWAYS return complete code in META JSON format.""" feedback = "" if self.best_eval: score = self.best_eval.get("quality_score", 0) feedback = f"\n\nPREVIOUS SCORE: {score}/100" if not self.best_eval.get("pytest_pass"): feedback += "\nCRITICAL: TESTS FAILING. Fix logic to pass all tests." if not self.best_eval.get("flake8_pass"): feedback += "\nFIX: Code quality issues (flake8)." prompt = f"""PROJECT: {self.goal} CURRENT CODE (BUILD UPON THIS): {self.current_code} INSTRUCTIONS: {self.instructions}{feedback} CRITICAL RULES: 1. Every function needs docstrings and type hints. 2. Write comprehensive pytest tests. 3. Complete implementations - NO placeholders. 4. Return all files in META JSON format. Return the perfected code in META format.""" # Attempt the model call, with extra retry attempts and reduced token fallback response = call_model(self.client, system, prompt, self.is_python, **self.settings) if not response or "<>") # Try one conservative retry with reduced token budget before failing logging.info("Attempting a conservative retry with reduced tokens...") conservative_settings = dict(self.settings) conservative_settings["max_new_tokens"] = min(1024, int(conservative_settings.get("max_new_tokens", 1024))) conservative_settings["temperature"] = min(0.3, float(conservative_settings.get("temperature", 0.3))) response_retry = call_model(self.client, system, prompt, self.is_python, **conservative_settings) if response_retry and "< Generator: iteration = 1 max_iterations = 999999 if self.infinite_mode else self.max_iters if not self.current_files: self.start_scaffolding() initial_state = {"stop_flag_path": str(self.stop_flag)} yield self.format_output(f"Starting with {self.model_name}...", iteration, max_iterations, initial_state) while iteration <= max_iterations: if self.stop_flag.exists(): try: self.stop_flag.unlink() logging.info("Stop flag detected - stopping generation") except OSError: pass yield self.format_output("⛔ Stopped by user", iteration, max_iterations) break yield self.format_output(f"🔄 Iteration {iteration}/{max_iterations} running...", iteration, max_iterations) result = self.perform_iteration(iteration) if not result.get("success"): warning_msg = result.get("warning", "Unknown iteration error") logging.warning(f"Iteration {iteration} failed: {warning_msg}") # CRITICAL: Still yield output so UI updates and shows the previous best code yield self.format_output(f"⚠️ Iteration {iteration} failed: {warning_msg}", iteration, max_iterations) time.sleep(2) # Give more time before retry iteration += 1 continue eval_res = result.get("eval", {}) score = eval_res.get("quality_score", 0) self.history.append({"iteration": iteration, "eval": eval_res}) self.current_files = result["files"] self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) if score > self.best_score: if self.best_workspace: self.cleanup_workdir(Path(self.best_workspace)) self.best_score = score self.best_eval = eval_res self.best_files = dict(result["files"]) self.best_workspace = result.get("workdir", "") self.best_zip = result.get("zip") self.best_review = result.get("review", "") self.best_readme = result.get("readme", "") logging.info(f"New best score: {score}/100") else: # Even if score didn't improve, still update current_files for next iteration logging.info(f"Score {score}/100 - keeping best: {self.best_score}/100") if result.get("workdir") and result.get("workdir") != self.best_workspace: self.cleanup_workdir(Path(result["workdir"])) yield self.format_output(f"Iteration {iteration} complete: {score}/100", iteration, max_iterations) iteration += 1 time.sleep(0.3) yield self.format_output(f"Complete! Best: {self.best_score}/100", iteration - 1, max_iterations) def format_output(self, log_msg: str, iteration: int, max_iters: int, state: Optional[Dict] = None): progress = f"Iteration {iteration}/{max_iters if max_iters < 999999 else 'INF'}" if iteration <= max_iters else "Complete" main = self.best_files.get("main.py", "# Generating code...") test = next((v for k, v in self.best_files.items() if 'test' in k and k.endswith('.py')), "# No tests yet...") req = self.best_files.get("requirements.txt", "# No requirements yet...") readme = self.best_files.get("README.md", "# Generating README...") other = {k: v for k, v in self.best_files.items() if k not in [ "main.py", next((k for k in self.best_files if 'test' in k and k.endswith('.py')), None), "requirements.txt", "README.md" ]} return ( f"[{time.strftime('%X')}] {sanitize_log_message(log_msg)}", self.model_name, progress, generate_metrics_html(self.history), self.best_eval, main, test, req, readme, other, self.best_review, self.best_zip, self.best_workspace, state or {} ) # ---------- UI Helpers ---------- def generate_metrics_html(history: List[Dict]) -> str: if not history: return "
No metrics yet
" html_parts = ["

Quality Trend

"] for h in history[-10:]: score = h.get("eval", {}).get("quality_score", 0) width = int(score * 2.5) color = "#10b981" if score >= 80 else "#f59e0b" if score >= 60 else "#ef4444" html_parts.append(f"
#{h.get('iteration')}:
{score}/100
") scores = [h.get("eval", {}).get("quality_score", 0) for h in history] avg = sum(scores) / len(scores) if scores else 0 best = max(scores) if scores else 0 html_parts.append(f"
Avg: {avg:.1f} | Best: {best:.1f}
") return "".join(html_parts) # ---------- UI ---------- def create_ui(): with gr.Blocks(title="InfinateCodeGenerator Ultimate", theme=gr.themes.Soft()) as demo: gr.Markdown("# InfinateCodeGenerator - Ultimate Merged Edition\n*Controller architecture • Smart models • Multi-file UI • Never stops early*") controller_state = gr.State({}) with gr.Row(): with gr.Column(scale=2): project_goal = gr.Textbox(label="Project Goal", lines=4, placeholder="E.g., Create a FastAPI endpoint for user authentication with MongoDB") with gr.Tabs(): with gr.TabItem("New Project"): initial_code = gr.Textbox(label="Starting Code (optional)", lines=8) with gr.TabItem("Import Project"): import_zip = gr.File(label="Upload ZIP", file_types=[".zip"]) import_btn = gr.Button("Import Files") import_status = gr.Textbox(label="Status", interactive=False) improve_instructions = gr.Textbox(label="Instructions", lines=3, value="Write comprehensive tests, add type hints, and improve error handling.") hf_token_manual = gr.Textbox(label="HF Token (optional, overrides env)", type="password") with gr.Row(): start_btn = gr.Button("Start Generation", variant="primary", size="lg") stop_btn = gr.Button("STOP", variant="stop", size="lg") with gr.Accordion("Settings", open=False): infinite_mode = gr.Checkbox(label="Infinite Mode", value=False) max_iters = gr.Slider(1, 15, value=5, step=1, label="Max Iterations") temperature = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Temperature") top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top P") max_tokens = gr.Slider(512, 4096, value=4096, step=512, label="Max Tokens") with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("Dashboard"): model_display = gr.Textbox(label="Active Model", interactive=False) progress_display = gr.Textbox(label="Progress", interactive=False) metrics_html = gr.HTML() run_log = gr.Textbox(label="Log", lines=10, interactive=False) with gr.TabItem("Generated Files"): with gr.Tabs(): with gr.TabItem("main.py"): main_file = gr.Code(language="python", lines=20) with gr.TabItem("tests/test_main.py"): test_file = gr.Code(language="python", lines=15) with gr.TabItem("requirements.txt"): req_file = gr.Textbox(lines=10, interactive=False, show_label=False) with gr.TabItem("README.md"): readme_file = gr.Code(language="markdown", lines=15) with gr.TabItem("Other Files"): other_files = gr.JSON() with gr.TabItem("Code Review"): review_display = gr.Textbox(label="AI Review", lines=10, interactive=False) with gr.TabItem("Evaluation"): eval_json = gr.JSON() with gr.TabItem("Download"): workspace_path = gr.Textbox(label="Best Version Path", interactive=False) download_zip = gr.File(label="Download ZIP") def import_project_files(zip_file): files = import_project(zip_file) if files: combined = "\n\n".join([f"# {name}\n{content}" for name, content in files.items()]) return f"Imported {len(files)} files!", combined return "Import failed", "" import_btn.click(fn=import_project_files, inputs=[import_zip], outputs=[import_status, initial_code]) def start_gen(goal, init_code, instructions, hf_tok, inf_mode, max_it, temp, top, max_tok): token = get_token_from_env_or_manual(hf_tok) if not token: error_msg = "ERROR: No HF token found. Please provide a Hugging Face token." logging.error(error_msg) yield (error_msg, "", "", "", {}, "", "", "", "", {}, "", None, "", {}) return logging.info(f"Starting generation with token: {token[:10]}... (length: {len(token)})") settings = {"temperature": temp, "top_p": top, "max_new_tokens": max_tok} is_python_project = detect_language(goal, init_code) logging.info(f"Detected project type: {'Python' if is_python_project else 'Other'}") controller = CodeGenController(token, goal, instructions, settings, int(max_it), inf_mode, is_python_project) if init_code and init_code.strip(): controller.current_files = {"main.py": init_code} controller.current_code = init_code logging.info("Using provided initial code") yield from controller.run_loop() def set_stop(controller_state_val): if controller_state_val and (stop_path_str := controller_state_val.get("stop_flag_path")): stop_path = Path(stop_path_str) try: stop_path.touch() logging.info(f"Stop flag created at {stop_path}") return "⛔ Stop signal sent! Will stop after current iteration completes..." except Exception as e: logging.error(f"Failed to create stop flag: {e}") return f"❌ Failed to stop: {e}" return "❌ Could not stop: No active process found. Start generation first." outputs = [ run_log, model_display, progress_display, metrics_html, eval_json, main_file, test_file, req_file, readme_file, other_files, review_display, download_zip, workspace_path, controller_state ] start_btn.click( fn=start_gen, inputs=[project_goal, initial_code, improve_instructions, hf_token_manual, infinite_mode, max_iters, temperature, top_p, max_tokens], outputs=outputs ) stop_btn.click(fn=set_stop, inputs=[controller_state], outputs=[run_log]) return demo if __name__ == "__main__": try: demo = create_ui() demo.queue().launch(server_name="0.0.0.0", server_port=7860) except Exception as e: print(f"Failed to launch Gradio app: {e}", file=sys.stderr) sys.exit(1)