Spaces:
Sleeping
Sleeping
| # InfinateCodeGenerator - Ultimate Merged Edition (v1.0.1) - full script | |
| """ | |
| Consolidated, hardened, and production-ready version (patched call_model & retries). | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import traceback | |
| import uuid | |
| import re | |
| import subprocess | |
| import shutil | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List, Tuple, Generator | |
| from datetime import datetime | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| # Added missing imports | |
| import tempfile | |
| import zipfile | |
| # ---------- Config ---------- | |
| PYTHON_MODEL = "Nxcode-CQ-7B-orpo" | |
| OTHER_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" | |
| FALLBACK_MODELS = [ | |
| "Qwen/Qwen2.5-Coder-32B-Instruct", | |
| "Nxcode-CQ-7B-orpo", | |
| "OpenCodeInterpreter-DS-33B" | |
| ] | |
| DEFAULT_TEMPERATURE = 0.5 | |
| DEFAULT_TOP_P = 0.9 | |
| DEFAULT_MAX_TOKENS = 4096 | |
| DEFAULT_MAX_ITERS = 5 | |
| COMMAND_TIMEOUT = 60 # Increased timeout for sandboxed execution | |
| ERROR_LOG_FILE = "/tmp/infgen_error.log" | |
| # Enhanced evaluation weights | |
| EVAL_WEIGHTS = { | |
| "style": 0.20, | |
| "security": 0.20, | |
| "tests": 0.40, | |
| "maintainability": 0.20 | |
| } | |
| # Setup structured logging | |
| logging.basicConfig( | |
| level=os.getenv("LOG_LEVEL", "INFO").upper(), | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| logging.FileHandler(ERROR_LOG_FILE), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| # ---------- Helpers ---------- | |
| def sanitize_log_message(message: str) -> str: | |
| """Redacts sensitive information like API tokens from logs.""" | |
| # Regex for typical Hugging Face tokens (hf_...) and other patterns | |
| token_pattern = re.compile(r"hf_[a-zA-Z0-9]{30,}") | |
| return token_pattern.sub("[REDACTED_TOKEN]", message) | |
| def write_error_log(exc: Exception, prefix: str = ""): | |
| """Writes a sanitized error log.""" | |
| tb = traceback.format_exc() | |
| sanitized_tb = sanitize_log_message(tb) | |
| sanitized_exc = sanitize_log_message(str(exc)) | |
| logging.error(f"{prefix} | Exception: {sanitized_exc}\nTraceback:\n{sanitized_tb}") | |
| def get_token_from_env_or_manual(manual_token: Optional[str]) -> Optional[str]: | |
| """Retrieves HF token securely from manual input or environment variables.""" | |
| if manual_token and manual_token.strip(): | |
| return manual_token.strip() | |
| return os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") | |
| def detect_language(goal: str, code: str) -> bool: | |
| """Detects whether the project is primarily Python. | |
| Returns True for Python, False otherwise. | |
| """ | |
| combined = (goal + " " + (code or "")).lower() | |
| python_kw = ["python", "django", "flask", "fastapi", "pytest", "def ", "import ", "pip"] | |
| return any(kw in combined for kw in python_kw) | |
| def run_cmd(cmd: List[str], cwd: Optional[str] = None, timeout: int = COMMAND_TIMEOUT) -> Tuple[int, str]: | |
| """Runs a command in a subprocess with a timeout and captures output.""" | |
| try: | |
| # Sandboxed execution: disable network for pytest | |
| env = os.environ.copy() | |
| # If command references pytest explicitly, disable network for safety | |
| if any("pytest" in str(part) for part in cmd): | |
| env["ALLOW_NETWORK"] = "0" | |
| proc = subprocess.run( | |
| cmd, | |
| cwd=cwd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| timeout=timeout, | |
| text=True, | |
| check=False, | |
| env=env | |
| ) | |
| return proc.returncode, proc.stdout or "" | |
| except subprocess.TimeoutExpired: | |
| return 1, f"TIMEOUT: Command '{' '.join(cmd)}' exceeded {timeout} seconds." | |
| except FileNotFoundError: | |
| return 1, f"COMMAND NOT FOUND: {cmd[0]}" | |
| except Exception as e: | |
| write_error_log(e, "run_cmd failed") | |
| return 1, f"ERROR: {e}" | |
| def write_files(workdir: Path, files: Dict[str, str]) -> None: | |
| """Safely writes a dictionary of files to a specified directory, preventing path traversal.""" | |
| workdir_resolved = workdir.resolve() | |
| for filename, content in files.items(): | |
| try: | |
| # Prevent path traversal attacks | |
| if ".." in filename or filename.startswith("/"): | |
| logging.warning(f"Blocked malicious path attempt: {filename}") | |
| continue | |
| target_path = (workdir_resolved / filename).resolve() | |
| # Final check to ensure the path is within the workdir | |
| if workdir_resolved not in target_path.parents and target_path != workdir_resolved: | |
| raise ValueError(f"Path traversal attempt blocked: {filename}") | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| target_path.write_text(content, encoding="utf-8") | |
| except Exception as e: | |
| write_error_log(e, f"Failed to write file {filename}") | |
| def make_zip(dirpath: Path) -> Optional[str]: | |
| """Creates a zip archive of a directory.""" | |
| try: | |
| base_path = str(dirpath.parent / dirpath.name) | |
| return shutil.make_archive(base_path, 'zip', root_dir=dirpath) | |
| except Exception as e: | |
| write_error_log(e, "ZIP creation failed") | |
| return None | |
| # ---------- Model calls ---------- | |
| def extract_chunk_content(chunk: Any) -> Optional[str]: | |
| """Extracts content from various possible streaming chunk formats.""" | |
| try: | |
| if isinstance(chunk, dict) and (choices := chunk.get("choices")): | |
| # typical OpenAI-like streaming chunk shape | |
| delta = choices[0].get("delta", {}) | |
| return delta.get("content") or delta.get("text") | |
| # HF newer shapes may use 'generations' inside chunk | |
| if isinstance(chunk, dict) and "generations" in chunk: | |
| gens = chunk.get("generations") or [] | |
| parts = [] | |
| for g in gens: | |
| if isinstance(g, dict) and "text" in g: | |
| parts.append(g["text"]) | |
| return "".join(parts) if parts else None | |
| # some streaming yields objects with .delta or .content attributes | |
| if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'): | |
| return chunk.delta.content | |
| if isinstance(chunk, str): | |
| return chunk | |
| except Exception: | |
| return None | |
| return None | |
| def call_model(client: InferenceClient, system: str, user: str, is_python: bool, **settings) -> str: | |
| """Calls the appropriate LLM with retry logic and multiple fallbacks. | |
| Tries non-streaming first (more reliable), falls back to streaming. | |
| """ | |
| if client is None: | |
| return "<<ERROR: No inference client provided>>" | |
| primary_model = PYTHON_MODEL if is_python else OTHER_MODEL | |
| models_to_try = [primary_model] + [m for m in FALLBACK_MODELS if m != primary_model] | |
| logging.info(f"Calling model for {'Python' if is_python else 'Other'} project. Primary: {primary_model}") | |
| logging.debug(f"Raw settings: {settings}") | |
| messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] | |
| # Build robust settings: include both keys some API variants accept | |
| cleaned = {} | |
| cleaned["temperature"] = settings.get("temperature", DEFAULT_TEMPERATURE) | |
| cleaned["top_p"] = settings.get("top_p", DEFAULT_TOP_P) | |
| max_new = settings.get("max_new_tokens", settings.get("max_tokens", DEFAULT_MAX_TOKENS)) | |
| try: | |
| max_new = int(max_new) | |
| except Exception: | |
| max_new = DEFAULT_MAX_TOKENS | |
| cleaned["max_new_tokens"] = max_new | |
| # also include max_tokens for API variants | |
| cleaned["max_tokens"] = max_new | |
| logging.info(f"Using cleaned settings: temperature={cleaned['temperature']}, top_p={cleaned['top_p']}, max_new_tokens={cleaned['max_new_tokens']}") | |
| last_exception = None | |
| for model_name in models_to_try: | |
| attempt = 0 | |
| # try a couple of times per model with decreasing tokens if necessary | |
| while attempt < 3: | |
| attempt += 1 | |
| try: | |
| logging.info(f"Attempting non-streaming call to {model_name} (attempt {attempt})") | |
| # Try named-argument style first (most robust) | |
| try: | |
| resp = client.chat_completion(messages=messages, model=model_name, stream=False, **cleaned) | |
| except TypeError as te: | |
| # Some client versions expect different parameter names - try a second shape | |
| logging.debug(f"TypeError calling chat_completion: {te}") | |
| try: | |
| resp = client.chat_completion(messages=messages, model=model_name, **cleaned) | |
| except Exception as e: | |
| raise | |
| except Exception as e: | |
| # bubble up to outer exception handling | |
| raise | |
| response_text = "" | |
| # Parse many possible shapes | |
| try: | |
| if isinstance(resp, dict): | |
| # common HF shapes | |
| if "generated_text" in resp and isinstance(resp["generated_text"], str): | |
| response_text = resp["generated_text"] | |
| elif "text" in resp and isinstance(resp["text"], str): | |
| response_text = resp["text"] | |
| elif "choices" in resp and resp["choices"]: | |
| choice = resp["choices"][0] | |
| if isinstance(choice, dict): | |
| if "message" in choice and isinstance(choice["message"], dict): | |
| response_text = choice["message"].get("content") or choice["message"].get("text", "") or "" | |
| else: | |
| response_text = choice.get("text") or choice.get("message") or "" | |
| else: | |
| response_text = str(choice) | |
| elif "generations" in resp and resp["generations"]: | |
| gens = resp["generations"] | |
| parts = [] | |
| for g in gens: | |
| if isinstance(g, dict) and "text" in g: | |
| parts.append(g.get("text", "")) | |
| elif hasattr(g, "text"): | |
| parts.append(getattr(g, "text")) | |
| response_text = "".join(parts) | |
| else: | |
| # fallback: inspect nested keys | |
| if "data" in resp and isinstance(resp["data"], list) and resp["data"]: | |
| # e.g., {'data':[{'text': '...'}]} | |
| first = resp["data"][0] | |
| if isinstance(first, dict) and "text" in first: | |
| response_text = first["text"] | |
| elif isinstance(resp, (list, tuple)): | |
| # maybe list of generation dicts | |
| parts = [] | |
| for item in resp: | |
| if isinstance(item, dict) and "text" in item: | |
| parts.append(item["text"]) | |
| else: | |
| parts.append(str(item)) | |
| response_text = "".join(parts) | |
| elif isinstance(resp, str): | |
| response_text = resp | |
| else: | |
| # last resort: str() | |
| response_text = str(resp) | |
| except Exception as e: | |
| write_error_log(e, f"Non-stream parsing failed for model {model_name}") | |
| response_text = "" | |
| if response_text and response_text.strip(): | |
| logging.info(f"✓ Successfully got response from {model_name} ({len(response_text)} chars)") | |
| return response_text | |
| else: | |
| logging.warning(f"Non-streaming returned empty response from {model_name}, attempt {attempt}.") | |
| # fall through to streaming fallback below | |
| except Exception as e: | |
| last_exception = e | |
| write_error_log(e, f"Non-stream model {model_name} failed on attempt {attempt}") | |
| logging.error(f"Non-stream error for {model_name}: {str(e)[:200]}") | |
| # Streaming fallback | |
| try: | |
| logging.info(f"Attempting streaming call to {model_name} (attempt {attempt})") | |
| # streaming - some versions yield objects, some strings | |
| try: | |
| stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True, **cleaned) | |
| except TypeError: | |
| # Try alternate call-signature | |
| stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True) | |
| except Exception as e: | |
| raise | |
| collected = [] | |
| try: | |
| for chunk in stream_iter: | |
| piece = extract_chunk_content(chunk) | |
| if piece: | |
| collected.append(piece) | |
| response = "".join(collected).strip() | |
| except Exception as e: | |
| # some streaming iterables need to be exhausted differently; safely cast to string | |
| write_error_log(e, "Streaming parsing failed") | |
| response = "" | |
| if response: | |
| logging.info(f"✓ Successfully got streaming response from {model_name} ({len(response)} chars)") | |
| return response | |
| else: | |
| logging.warning(f"Streaming returned empty response from {model_name} (attempt {attempt})") | |
| except Exception as e: | |
| last_exception = e | |
| write_error_log(e, f"Streaming model {model_name} failed on attempt {attempt}") | |
| logging.error(f"Streaming error for {model_name}: {str(e)[:200]}") | |
| # reduce tokens and retry | |
| time.sleep(1 + attempt * 0.5) | |
| # reduce token budget to try avoid model refusing or failing | |
| cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.5)) | |
| cleaned["max_tokens"] = cleaned["max_new_tokens"] | |
| logging.info(f"Reduced max_new_tokens to {cleaned['max_new_tokens']} and retrying") | |
| continue | |
| # if reached here (no response), reduce tokens and retry | |
| cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.6)) | |
| cleaned["max_tokens"] = cleaned["max_new_tokens"] | |
| logging.info(f"No response; reduced max_new_tokens to {cleaned['max_new_tokens']} and will retry (attempt {attempt})") | |
| time.sleep(0.8 + attempt * 0.3) | |
| logging.error(f"❌ ALL MODELS FAILED. Last error: {last_exception}") | |
| return f"<<ERROR: All models failed. Last error: {sanitize_log_message(str(last_exception))}>>" | |
| # ---------- Robust parsing ---------- | |
| def validate_files_dict(files: Dict[str, str]) -> bool: | |
| """Validates that the generated files dictionary is well-formed.""" | |
| if not isinstance(files, dict) or not files: | |
| return False | |
| return all(isinstance(k, str) and isinstance(v, str) for k, v in files.items()) | |
| def parse_meta(text: str) -> Optional[Dict[str, Any]]: | |
| """Parses model output to extract code files, trying structured JSON first, then falling back to heuristics.""" | |
| if not text or not isinstance(text, str): | |
| return None | |
| # Strict JSON/META block parsing | |
| for pattern in [r"```json\s*(.*?)```", r"```meta\s*(.*?)```", r"```META\s*(.*?)```", r"<META>(.*?)</META>"]: | |
| match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| try: | |
| content = match.group(1).strip() | |
| parsed = json.loads(content) | |
| if "files" in parsed and validate_files_dict(parsed["files"]): | |
| logging.info(f"Successfully parsed META JSON with {len(parsed['files'])} files") | |
| return parsed | |
| except (json.JSONDecodeError, TypeError) as e: | |
| logging.warning(f"JSON parse failed: {e}") | |
| continue | |
| # Also try to detect a top-level JSON blob | |
| try: | |
| parsed_full = json.loads(text.strip()) | |
| if isinstance(parsed_full, dict) and "files" in parsed_full and validate_files_dict(parsed_full["files"]): | |
| logging.info("Parsed raw JSON response as META") | |
| return parsed_full | |
| except Exception: | |
| pass | |
| # Fallback to heuristic parsing of code blocks | |
| files = {} | |
| # Try to find filename markers before code blocks | |
| filename_patterns = [ | |
| r'#\s*[Ff]ile:\s*([\w/._-]+\.[\w]+)', | |
| r'##\s*([\w/._-]+\.[\w]+)', | |
| r'\*\*\s*([\w/._-]+\.[\w]+)\s*\*\*', | |
| r'^\s*([\w\-/_.]+?\.(?:py|txt|md|json|yaml|yml))\s*:\s*$', # e.g., "main.py:" on its own line | |
| ] | |
| all_filenames = [] | |
| for pattern in filename_patterns: | |
| all_filenames.extend(re.findall(pattern, text, flags=re.MULTILINE)) | |
| # Grab all fenced code blocks | |
| code_blocks = re.findall(r"```(?:[\w+-]+)?\s*([\s\S]*?)```", text, re.DOTALL) | |
| # Also capture indented/code-block-like sections (fallback) | |
| if not code_blocks: | |
| # naive: split by two or more newlines and keep blocks that look like code | |
| chunks = [c for c in re.split(r"\n{2,}", text) if len(c.splitlines()) > 1] | |
| code_blocks = chunks[:6] # limit | |
| if not code_blocks: | |
| logging.warning("No code blocks found in model response") | |
| return None | |
| # Match filenames with code blocks | |
| for i, block in enumerate(code_blocks): | |
| block_content = block.strip() | |
| if not block_content: | |
| continue | |
| if i < len(all_filenames): | |
| filename = all_filenames[i] | |
| else: | |
| # Guess filename based on content | |
| if "def test_" in block_content or "import pytest" in block_content: | |
| filename = f"tests/test_main.py" if not block_content.startswith("test_") else f"{block_content.splitlines()[0][:50]}.py" | |
| elif "requirements" in text.lower() and i == 0: | |
| filename = "requirements.txt" | |
| elif "# README" in block_content or block_content.startswith("# ") or block_content.lower().strip().startswith("readme"): | |
| filename = "README.md" | |
| else: | |
| filename = f"main.py" if i == 0 else f"file_{i}.py" | |
| # ensure relative path safe | |
| files[filename] = block_content | |
| if validate_files_dict(files) and files: | |
| logging.info(f"Heuristic parsing extracted {len(files)} files: {list(files.keys())}") | |
| return {"files": files, "changelog": "Extracted files via heuristic parsing."} | |
| # As a last resort, if the whole output looks like a single file, place it into main.py | |
| if text.strip(): | |
| files = {"main.py": text.strip()} | |
| if validate_files_dict(files): | |
| logging.info("Parsed whole response into main.py as last resort") | |
| return {"files": files, "changelog": "Fallback single-file parse."} | |
| logging.error("Failed to extract any valid files from model response") | |
| return None | |
| # ---------- Enhanced evaluators ---------- | |
| def run_evaluators(workdir: Path) -> Dict[str, Any]: | |
| out = {} | |
| rc, txt = run_cmd([sys.executable, "-m", "flake8", ".", "--count", "--max-line-length=100"], cwd=str(workdir)) | |
| out["flake8_pass"] = rc == 0 | |
| out["flake8_out"] = txt | |
| rc, txt = run_cmd([sys.executable, "-m", "bandit", "-r", ".", "-f", "txt"], cwd=str(workdir)) | |
| out["bandit_pass"] = rc == 0 or "No issues" in txt | |
| out["bandit_out"] = txt | |
| test_files = list(workdir.glob("**/test_*.py")) + list(workdir.glob("**/*_test.py")) | |
| if test_files: | |
| rc, txt = run_cmd([sys.executable, "-m", "pytest", "--maxfail=1", "--tb=short"], cwd=str(workdir)) | |
| out["pytest_pass"] = rc == 0 | |
| else: | |
| out["pytest_pass"] = False | |
| out["pytest_out"] = txt if test_files else "No tests" | |
| rc, txt = run_cmd([sys.executable, "-m", "black", "--check", "."], cwd=str(workdir)) | |
| out["black_pass"] = rc == 0 | |
| complexity = 5.0 | |
| rc, txt = run_cmd([sys.executable, "-m", "radon", "cc", ".", "-s", "-a"], cwd=str(workdir)) | |
| if rc == 0: | |
| m = re.search(r"Average complexity.*?([0-9.]+)", txt) | |
| if m: | |
| try: | |
| complexity = float(m.group(1)) | |
| except: | |
| pass | |
| out["complexity"] = complexity | |
| # Calculate weighted score | |
| style = 100.0 if (out["flake8_pass"] and out["black_pass"]) else 50.0 | |
| security = 100.0 if out["bandit_pass"] else 30.0 | |
| tests = 100.0 if out["pytest_pass"] else 20.0 | |
| maintainability = max(0.0, 100.0 - (complexity - 5.0) * 10.0) if complexity > 5 else 100.0 | |
| w = EVAL_WEIGHTS | |
| score = w["style"] * style + w["security"] * security + w["tests"] * tests + w["maintainability"] * maintainability | |
| out["quality_score"] = round(max(0.0, min(100.0, score)), 1) | |
| out["breakdown"] = { | |
| "style": round(style, 1), | |
| "security": round(security, 1), | |
| "tests": round(tests, 1), | |
| "maintainability": round(maintainability, 1) | |
| } | |
| return out | |
| # ---------- AI features ---------- | |
| def generate_code_review(client: Optional[InferenceClient], token: str, files: Dict[str, str], eval_results: Dict, is_python: bool) -> str: | |
| preview = "\n".join([f"# {n}\n{c[:300]}..." for n, c in list(files.items())[:2]]) | |
| prompt = f"""Review this code: | |
| {preview} | |
| Quality: Flake8={'Pass' if eval_results.get('flake8_pass') else 'Fail'}, Tests={'Pass' if eval_results.get('pytest_pass') else 'Fail'} | |
| Give 2-3 specific, actionable improvements:""" | |
| review = call_model(client, "You are a senior code reviewer.", prompt, is_python, max_new_tokens=400, temperature=0.2, top_p=0.8) | |
| return review if review and "<<ERROR" not in review else "No review" | |
| def generate_readme(client: Optional[InferenceClient], token: str, goal: str, files: Dict[str, str], is_python: bool) -> str: | |
| summary = "\n".join([f"- {n}: {len(c.splitlines())} lines" for n, c in files.items()]) | |
| prompt = f"""Create README.md: | |
| Goal: {goal} | |
| Files: | |
| {summary} | |
| Include: description, installation, usage.""" | |
| readme = call_model(client, "You are a technical writer.", prompt, is_python, max_new_tokens=600, temperature=0.2, top_p=0.9) | |
| return readme if readme and "<<ERROR" not in readme else "# Project\n\nGenerated code." | |
| def create_initial_scaffold(client: Optional[InferenceClient], token: str, goal: str, is_python: bool) -> Optional[Dict[str, Any]]: | |
| system = """You are a Principal Software Architect. Create a professional initial project scaffold with working code, requirements.txt, and tests.""" | |
| prompt = f"""Project: {goal} | |
| Create Version 0.1 scaffold: | |
| 1. Choose appropriate libraries (requirements.txt) | |
| 2. Working minimal code | |
| 3. Basic tests in tests/ | |
| Return as META JSON with files mapping.""" | |
| try: | |
| response = call_model(client, system, prompt, is_python, max_new_tokens=3072, temperature=0.4) | |
| if response and "<<ERROR" not in response: | |
| meta = parse_meta(response) | |
| if meta and meta.get("files") and validate_files_dict(meta["files"]): | |
| return meta | |
| else: | |
| # Save raw scaffold response for debugging | |
| with open("/tmp/failed_scaffold_response.txt", "w") as f: | |
| f.write(response) | |
| except Exception as e: | |
| write_error_log(e, "Scaffold failed") | |
| return None | |
| def import_project(zip_file) -> Dict[str, str]: | |
| if not zip_file: | |
| return {} | |
| try: | |
| files = {} | |
| with zipfile.ZipFile(zip_file.name, 'r') as zf: | |
| for filename in zf.namelist(): | |
| if filename.endswith(('.py', '.txt', '.md', '.json', '.yaml', '.yml')): | |
| try: | |
| content = zf.read(filename).decode('utf-8') | |
| files[filename] = content | |
| except: | |
| pass | |
| return files | |
| except Exception as e: | |
| write_error_log(e, "Import failed") | |
| return {} | |
| # ---------- Controller ---------- | |
| class CodeGenController: | |
| def __init__(self, token: str, goal: str, instructions: str, settings: Dict, max_iters: int, infinite_mode: bool, is_python: bool): | |
| self.token = token | |
| try: | |
| self.client = InferenceClient(token=token) | |
| logging.info("✓ InferenceClient initialized successfully") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize InferenceClient: {e}") | |
| raise | |
| self.goal = goal | |
| self.instructions = instructions | |
| self.settings = settings | |
| self.max_iters = max_iters | |
| self.infinite_mode = infinite_mode | |
| self.is_python = is_python | |
| self.model_name = PYTHON_MODEL if is_python else OTHER_MODEL | |
| logging.info(f"Controller initialized for {'Python' if is_python else 'Other'} with model: {self.model_name}") | |
| self.history: List[Dict] = [] | |
| self.current_files: Dict[str, str] = {} | |
| self.current_code: str = "" | |
| self.best_score: float = 0.0 | |
| self.best_eval: Dict = {} | |
| self.best_files: Dict[str, str] = {} | |
| self.best_workspace: str = "" | |
| self.best_zip: Optional[str] = None | |
| self.best_review: str = "" | |
| self.best_readme: str = "" | |
| self.stop_flag = Path(tempfile.gettempdir()) / f"stop_{uuid.uuid4().hex[:8]}" | |
| def cleanup_workdir(self, workdir: Path): | |
| try: | |
| if workdir.exists(): | |
| shutil.rmtree(workdir) | |
| except Exception as e: | |
| write_error_log(e, f"Failed to cleanup workdir {workdir}") | |
| def start_scaffolding(self) -> bool: | |
| scaffold = create_initial_scaffold(self.client, self.token, self.goal, self.is_python) | |
| if scaffold and scaffold.get("files"): | |
| self.current_files = scaffold["files"] | |
| self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) | |
| self.best_files = dict(self.current_files) | |
| # Ensure we have at least requirements.txt | |
| if "requirements.txt" not in self.best_files: | |
| self.best_files["requirements.txt"] = "# Add your requirements here" | |
| return True | |
| # Better defaults if scaffolding fails | |
| self.current_files = { | |
| "main.py": "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()", | |
| "requirements.txt": "# Add your requirements here" | |
| } | |
| self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) | |
| self.best_files = dict(self.current_files) | |
| return False | |
| def perform_iteration(self, iteration: int) -> Dict[str, Any]: | |
| parent = Path(tempfile.mkdtemp(prefix="infgen_")) | |
| workdir = parent / f"iter_{iteration}_{uuid.uuid4().hex[:6]}" | |
| workdir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| system = """You are a Level 5 Principal Software Engineer specializing in production-ready code. | |
| Follow Defensive Programming, TDD, and best practices. | |
| Output MUST be flawless, well-tested, and industry-standard. | |
| ALWAYS return complete code in META JSON format.""" | |
| feedback = "" | |
| if self.best_eval: | |
| score = self.best_eval.get("quality_score", 0) | |
| feedback = f"\n\nPREVIOUS SCORE: {score}/100" | |
| if not self.best_eval.get("pytest_pass"): | |
| feedback += "\nCRITICAL: TESTS FAILING. Fix logic to pass all tests." | |
| if not self.best_eval.get("flake8_pass"): | |
| feedback += "\nFIX: Code quality issues (flake8)." | |
| prompt = f"""PROJECT: {self.goal} | |
| CURRENT CODE (BUILD UPON THIS): | |
| {self.current_code} | |
| INSTRUCTIONS: {self.instructions}{feedback} | |
| CRITICAL RULES: | |
| 1. Every function needs docstrings and type hints. | |
| 2. Write comprehensive pytest tests. | |
| 3. Complete implementations - NO placeholders. | |
| 4. Return all files in META JSON format. | |
| Return the perfected code in META format.""" | |
| # Attempt the model call, with extra retry attempts and reduced token fallback | |
| response = call_model(self.client, system, prompt, self.is_python, **self.settings) | |
| if not response or "<<ERROR" in response: | |
| logging.error(f"Model returned error or empty: {response[:200]}") | |
| # Save response for debugging if available | |
| with open(f"/tmp/failed_response_{iteration}.txt", "w") as f: | |
| f.write(response or "<<EMPTY RESPONSE>>") | |
| # Try one conservative retry with reduced token budget before failing | |
| logging.info("Attempting a conservative retry with reduced tokens...") | |
| conservative_settings = dict(self.settings) | |
| conservative_settings["max_new_tokens"] = min(1024, int(conservative_settings.get("max_new_tokens", 1024))) | |
| conservative_settings["temperature"] = min(0.3, float(conservative_settings.get("temperature", 0.3))) | |
| response_retry = call_model(self.client, system, prompt, self.is_python, **conservative_settings) | |
| if response_retry and "<<ERROR" not in response_retry: | |
| response = response_retry | |
| else: | |
| # Still failed — produce a safe fallback scaffold so iteration does not fail | |
| logging.warning(f"Conservative retry failed for iteration {iteration}; producing fallback scaffold.") | |
| fallback_main = self.current_files.get("main.py", "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()") | |
| fallback_main += "\n\n# NOTE: Model failed to generate new code for this iteration. Fallback scaffold inserted." | |
| fallback_test = ( | |
| "import pytest\n\n" | |
| "def test_placeholder():\n" | |
| " \"\"\"Placeholder test created because model failed to produce output.\"\"\"\n" | |
| " assert True\n" | |
| ) | |
| fallback_requirements = self.current_files.get("requirements.txt", "# Add your requirements here") | |
| files = { | |
| "main.py": fallback_main, | |
| "tests/test_main.py": fallback_test, | |
| "requirements.txt": fallback_requirements | |
| } | |
| # Write files and run evaluators so the pipeline can continue | |
| write_files(workdir, files) | |
| eval_results = run_evaluators(workdir) | |
| eval_results["fallback_used"] = True | |
| eval_results["fallback_info"] = f"Model failed; fallback scaffold used for iteration {iteration}." | |
| review = f"Fallback scaffold inserted because model failed to return usable output. See /tmp/failed_response_{iteration}.txt for raw model response." | |
| readme = f"# Fallback Project\n\nThis scaffold was inserted automatically because model generation failed on iteration {iteration}." | |
| files["README.md"] = readme | |
| write_files(workdir, {"README.md": readme}) | |
| zip_path = make_zip(workdir) | |
| return { | |
| "success": True, | |
| "eval": eval_results, | |
| "zip": zip_path, | |
| "workdir": str(workdir), | |
| "files": files, | |
| "review": review, | |
| "readme": readme | |
| } | |
| meta = parse_meta(response) | |
| if not meta or not meta.get("files"): | |
| logging.error(f"Parse failed. Response preview: {response[:1000]}") | |
| # Save failed response for debugging | |
| with open(f"/tmp/failed_response_{iteration}.txt", "w") as f: | |
| f.write(response) | |
| return {"success": False, "warning": f"Parse failed - see /tmp/failed_response_{iteration}.txt"} | |
| files = meta["files"] | |
| write_files(workdir, files) | |
| eval_results = run_evaluators(workdir) | |
| review = generate_code_review(self.client, self.token, files, eval_results, self.is_python) | |
| readme = generate_readme(self.client, self.token, self.goal, files, self.is_python) | |
| files["README.md"] = readme | |
| write_files(workdir, {"README.md": readme}) | |
| zip_path = make_zip(workdir) | |
| return { | |
| "success": True, "eval": eval_results, "zip": zip_path, "workdir": str(workdir), | |
| "files": files, "review": review, "readme": readme | |
| } | |
| except Exception as e: | |
| write_error_log(e, "Iteration exception") | |
| return {"success": False, "warning": f"Exception: {str(e)}"} | |
| def run_loop(self) -> Generator: | |
| iteration = 1 | |
| max_iterations = 999999 if self.infinite_mode else self.max_iters | |
| if not self.current_files: | |
| self.start_scaffolding() | |
| initial_state = {"stop_flag_path": str(self.stop_flag)} | |
| yield self.format_output(f"Starting with {self.model_name}...", iteration, max_iterations, initial_state) | |
| while iteration <= max_iterations: | |
| if self.stop_flag.exists(): | |
| try: | |
| self.stop_flag.unlink() | |
| logging.info("Stop flag detected - stopping generation") | |
| except OSError: | |
| pass | |
| yield self.format_output("⛔ Stopped by user", iteration, max_iterations) | |
| break | |
| yield self.format_output(f"🔄 Iteration {iteration}/{max_iterations} running...", iteration, max_iterations) | |
| result = self.perform_iteration(iteration) | |
| if not result.get("success"): | |
| warning_msg = result.get("warning", "Unknown iteration error") | |
| logging.warning(f"Iteration {iteration} failed: {warning_msg}") | |
| # CRITICAL: Still yield output so UI updates and shows the previous best code | |
| yield self.format_output(f"⚠️ Iteration {iteration} failed: {warning_msg}", iteration, max_iterations) | |
| time.sleep(2) # Give more time before retry | |
| iteration += 1 | |
| continue | |
| eval_res = result.get("eval", {}) | |
| score = eval_res.get("quality_score", 0) | |
| self.history.append({"iteration": iteration, "eval": eval_res}) | |
| self.current_files = result["files"] | |
| self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items()) | |
| if score > self.best_score: | |
| if self.best_workspace: | |
| self.cleanup_workdir(Path(self.best_workspace)) | |
| self.best_score = score | |
| self.best_eval = eval_res | |
| self.best_files = dict(result["files"]) | |
| self.best_workspace = result.get("workdir", "") | |
| self.best_zip = result.get("zip") | |
| self.best_review = result.get("review", "") | |
| self.best_readme = result.get("readme", "") | |
| logging.info(f"New best score: {score}/100") | |
| else: | |
| # Even if score didn't improve, still update current_files for next iteration | |
| logging.info(f"Score {score}/100 - keeping best: {self.best_score}/100") | |
| if result.get("workdir") and result.get("workdir") != self.best_workspace: | |
| self.cleanup_workdir(Path(result["workdir"])) | |
| yield self.format_output(f"Iteration {iteration} complete: {score}/100", iteration, max_iterations) | |
| iteration += 1 | |
| time.sleep(0.3) | |
| yield self.format_output(f"Complete! Best: {self.best_score}/100", iteration - 1, max_iterations) | |
| def format_output(self, log_msg: str, iteration: int, max_iters: int, state: Optional[Dict] = None): | |
| progress = f"Iteration {iteration}/{max_iters if max_iters < 999999 else 'INF'}" if iteration <= max_iters else "Complete" | |
| main = self.best_files.get("main.py", "# Generating code...") | |
| test = next((v for k, v in self.best_files.items() if 'test' in k and k.endswith('.py')), "# No tests yet...") | |
| req = self.best_files.get("requirements.txt", "# No requirements yet...") | |
| readme = self.best_files.get("README.md", "# Generating README...") | |
| other = {k: v for k, v in self.best_files.items() if k not in [ | |
| "main.py", next((k for k in self.best_files if 'test' in k and k.endswith('.py')), None), | |
| "requirements.txt", "README.md" | |
| ]} | |
| return ( | |
| f"[{time.strftime('%X')}] {sanitize_log_message(log_msg)}", self.model_name, progress, | |
| generate_metrics_html(self.history), self.best_eval, main, test, req, readme, other, | |
| self.best_review, self.best_zip, self.best_workspace, state or {} | |
| ) | |
| # ---------- UI Helpers ---------- | |
| def generate_metrics_html(history: List[Dict]) -> str: | |
| if not history: | |
| return "<div style='padding:12px'>No metrics yet</div>" | |
| html_parts = ["<div style='font-family:sans-serif'><h4>Quality Trend</h4><div style='background:#f8f9fa;padding:12px;border-radius:8px'>"] | |
| for h in history[-10:]: | |
| score = h.get("eval", {}).get("quality_score", 0) | |
| width = int(score * 2.5) | |
| color = "#10b981" if score >= 80 else "#f59e0b" if score >= 60 else "#ef4444" | |
| html_parts.append(f"<div style='margin:4px 0'>#{h.get('iteration')}: <div style='display:inline-block;width:{width}px;height:20px;background:{color};border-radius:4px'></div> {score}/100</div>") | |
| scores = [h.get("eval", {}).get("quality_score", 0) for h in history] | |
| avg = sum(scores) / len(scores) if scores else 0 | |
| best = max(scores) if scores else 0 | |
| html_parts.append(f"<div style='margin-top:12px'><strong>Avg:</strong> {avg:.1f} | <strong>Best:</strong> {best:.1f}</div></div></div>") | |
| return "".join(html_parts) | |
| # ---------- UI ---------- | |
| def create_ui(): | |
| with gr.Blocks(title="InfinateCodeGenerator Ultimate", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# InfinateCodeGenerator - Ultimate Merged Edition\n*Controller architecture • Smart models • Multi-file UI • Never stops early*") | |
| controller_state = gr.State({}) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| project_goal = gr.Textbox(label="Project Goal", lines=4, placeholder="E.g., Create a FastAPI endpoint for user authentication with MongoDB") | |
| with gr.Tabs(): | |
| with gr.TabItem("New Project"): | |
| initial_code = gr.Textbox(label="Starting Code (optional)", lines=8) | |
| with gr.TabItem("Import Project"): | |
| import_zip = gr.File(label="Upload ZIP", file_types=[".zip"]) | |
| import_btn = gr.Button("Import Files") | |
| import_status = gr.Textbox(label="Status", interactive=False) | |
| improve_instructions = gr.Textbox(label="Instructions", lines=3, value="Write comprehensive tests, add type hints, and improve error handling.") | |
| hf_token_manual = gr.Textbox(label="HF Token (optional, overrides env)", type="password") | |
| with gr.Row(): | |
| start_btn = gr.Button("Start Generation", variant="primary", size="lg") | |
| stop_btn = gr.Button("STOP", variant="stop", size="lg") | |
| with gr.Accordion("Settings", open=False): | |
| infinite_mode = gr.Checkbox(label="Infinite Mode", value=False) | |
| max_iters = gr.Slider(1, 15, value=5, step=1, label="Max Iterations") | |
| temperature = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Temperature") | |
| top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top P") | |
| max_tokens = gr.Slider(512, 4096, value=4096, step=512, label="Max Tokens") | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.TabItem("Dashboard"): | |
| model_display = gr.Textbox(label="Active Model", interactive=False) | |
| progress_display = gr.Textbox(label="Progress", interactive=False) | |
| metrics_html = gr.HTML() | |
| run_log = gr.Textbox(label="Log", lines=10, interactive=False) | |
| with gr.TabItem("Generated Files"): | |
| with gr.Tabs(): | |
| with gr.TabItem("main.py"): | |
| main_file = gr.Code(language="python", lines=20) | |
| with gr.TabItem("tests/test_main.py"): | |
| test_file = gr.Code(language="python", lines=15) | |
| with gr.TabItem("requirements.txt"): | |
| req_file = gr.Textbox(lines=10, interactive=False, show_label=False) | |
| with gr.TabItem("README.md"): | |
| readme_file = gr.Code(language="markdown", lines=15) | |
| with gr.TabItem("Other Files"): | |
| other_files = gr.JSON() | |
| with gr.TabItem("Code Review"): | |
| review_display = gr.Textbox(label="AI Review", lines=10, interactive=False) | |
| with gr.TabItem("Evaluation"): | |
| eval_json = gr.JSON() | |
| with gr.TabItem("Download"): | |
| workspace_path = gr.Textbox(label="Best Version Path", interactive=False) | |
| download_zip = gr.File(label="Download ZIP") | |
| def import_project_files(zip_file): | |
| files = import_project(zip_file) | |
| if files: | |
| combined = "\n\n".join([f"# {name}\n{content}" for name, content in files.items()]) | |
| return f"Imported {len(files)} files!", combined | |
| return "Import failed", "" | |
| import_btn.click(fn=import_project_files, inputs=[import_zip], outputs=[import_status, initial_code]) | |
| def start_gen(goal, init_code, instructions, hf_tok, inf_mode, max_it, temp, top, max_tok): | |
| token = get_token_from_env_or_manual(hf_tok) | |
| if not token: | |
| error_msg = "ERROR: No HF token found. Please provide a Hugging Face token." | |
| logging.error(error_msg) | |
| yield (error_msg, "", "", "", {}, "", "", "", "", {}, "", None, "", {}) | |
| return | |
| logging.info(f"Starting generation with token: {token[:10]}... (length: {len(token)})") | |
| settings = {"temperature": temp, "top_p": top, "max_new_tokens": max_tok} | |
| is_python_project = detect_language(goal, init_code) | |
| logging.info(f"Detected project type: {'Python' if is_python_project else 'Other'}") | |
| controller = CodeGenController(token, goal, instructions, settings, int(max_it), inf_mode, is_python_project) | |
| if init_code and init_code.strip(): | |
| controller.current_files = {"main.py": init_code} | |
| controller.current_code = init_code | |
| logging.info("Using provided initial code") | |
| yield from controller.run_loop() | |
| def set_stop(controller_state_val): | |
| if controller_state_val and (stop_path_str := controller_state_val.get("stop_flag_path")): | |
| stop_path = Path(stop_path_str) | |
| try: | |
| stop_path.touch() | |
| logging.info(f"Stop flag created at {stop_path}") | |
| return "⛔ Stop signal sent! Will stop after current iteration completes..." | |
| except Exception as e: | |
| logging.error(f"Failed to create stop flag: {e}") | |
| return f"❌ Failed to stop: {e}" | |
| return "❌ Could not stop: No active process found. Start generation first." | |
| outputs = [ | |
| run_log, model_display, progress_display, metrics_html, eval_json, | |
| main_file, test_file, req_file, readme_file, other_files, | |
| review_display, download_zip, workspace_path, controller_state | |
| ] | |
| start_btn.click( | |
| fn=start_gen, | |
| inputs=[project_goal, initial_code, improve_instructions, hf_token_manual, infinite_mode, max_iters, temperature, top_p, max_tokens], | |
| outputs=outputs | |
| ) | |
| stop_btn.click(fn=set_stop, inputs=[controller_state], outputs=[run_log]) | |
| return demo | |
| if __name__ == "__main__": | |
| try: | |
| demo = create_ui() | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860) | |
| except Exception as e: | |
| print(f"Failed to launch Gradio app: {e}", file=sys.stderr) | |
| sys.exit(1) | |