Spaces:
Sleeping
Sleeping
File size: 45,341 Bytes
11b08f2 3d45d63 35a9282 3d45d63 b9cdcdc 23d5009 8ad537f 9e75ddb 8ad537f fed23cb 94bbfa7 34bdddf 248cac4 23d5009 34bdddf 7adb553 b9cdcdc 8ad537f 4a446b5 4c63a0d 3d45d63 34bdddf 44859d3 34bdddf 60f461f 248cac4 ce1bdb5 34bdddf 248cac4 34bdddf 248cac4 ce1bdb5 248cac4 9e75ddb 248cac4 9e75ddb 248cac4 9e75ddb ce1bdb5 248cac4 ce1bdb5 248cac4 7278ec7 4c63a0d 34bdddf 4c63a0d 3d45d63 34bdddf 248cac4 8ad537f 248cac4 4c63a0d 248cac4 9e75ddb 8ad537f 248cac4 9e75ddb 248cac4 b9cdcdc 248cac4 ce1bdb5 8ad537f 9e75ddb 248cac4 9e75ddb 248cac4 9e75ddb 248cac4 8ad537f 9e75ddb 248cac4 9e75ddb 248cac4 9e75ddb 248cac4 7adb553 34bdddf 248cac4 7adb553 248cac4 35a9282 248cac4 9dfa4a6 35a9282 248cac4 9dfa4a6 d950ef1 9e75ddb 7adb553 9e75ddb 248cac4 35a9282 9dfa4a6 35a9282 248cac4 35a9282 44859d3 35a9282 3d45d63 35a9282 248cac4 35a9282 34bdddf 35a9282 9dfa4a6 35a9282 9dfa4a6 35a9282 9dfa4a6 35a9282 9dfa4a6 35a9282 9dfa4a6 35a9282 9dfa4a6 e17f2fc 248cac4 0c86bac 34bdddf 248cac4 34bdddf 248cac4 34bdddf 9e75ddb 3d3d9fe 35a9282 3d3d9fe 35a9282 248cac4 4b14cd3 fed23cb 248cac4 44859d3 0c86bac 44859d3 34bdddf 35a9282 3d3d9fe 248cac4 35a9282 44859d3 35a9282 44859d3 35a9282 44859d3 35a9282 3d3d9fe 44859d3 3d3d9fe 248cac4 44859d3 3d3d9fe 44859d3 35a9282 44859d3 35a9282 44859d3 35a9282 44859d3 35a9282 44859d3 248cac4 44859d3 3d3d9fe 35a9282 44859d3 3d45d63 5b89f5b 34bdddf 9e75ddb 3d45d63 35a9282 34bdddf ce1bdb5 3d45d63 35a9282 ce1bdb5 3d45d63 ce1bdb5 35a9282 9e75ddb 34bdddf 0c86bac 9e75ddb 7adb553 3d45d63 35a9282 ce1bdb5 35a9282 34bdddf 35a9282 34bdddf 35a9282 34bdddf 35a9282 34bdddf 35a9282 9e75ddb 34bdddf 248cac4 34bdddf 3d45d63 34bdddf 79eb78a 34bdddf 35a9282 34bdddf 248cac4 34bdddf 0c86bac 34bdddf 35a9282 34bdddf 248cac4 34bdddf 35a9282 3d45d63 34bdddf 3d45d63 35a9282 3d45d63 79eb78a 34bdddf 248cac4 34bdddf e17f2fc 34bdddf 35a9282 e17f2fc 35a9282 34bdddf 35a9282 34bdddf 35a9282 34bdddf 248cac4 35a9282 34bdddf bdda78d 34bdddf bdda78d 34bdddf 35a9282 34bdddf 35a9282 34bdddf 248cac4 34bdddf 248cac4 34bdddf 35a9282 248cac4 34bdddf 35a9282 11b08f2 34bdddf 35a9282 44859d3 34bdddf 35a9282 34bdddf 35a9282 34bdddf 248cac4 34bdddf 248cac4 35a9282 34bdddf 35a9282 34bdddf 35a9282 248cac4 35a9282 34bdddf 248cac4 44859d3 34bdddf 35a9282 44859d3 35a9282 34bdddf 35a9282 248cac4 44859d3 248cac4 34bdddf 35a9282 34bdddf 35a9282 34bdddf 44859d3 35a9282 34bdddf 35a9282 34bdddf 35a9282 34bdddf 35a9282 248cac4 35a9282 248cac4 34bdddf 35a9282 bdda78d 248cac4 35a9282 248cac4 35a9282 34bdddf 248cac4 34bdddf 35a9282 248cac4 34bdddf 248cac4 35a9282 34bdddf 248cac4 35a9282 248cac4 34bdddf 3d45d63 34bdddf 35a9282 248cac4 3d45d63 248cac4 3d45d63 34bdddf 3d45d63 248cac4 3d45d63 34bdddf 248cac4 3d45d63 248cac4 34bdddf 248cac4 3d45d63 34bdddf b3fe677 df639fa b3fe677 248cac4 e17f2fc 248cac4 35a9282 e17f2fc 35a9282 248cac4 e17f2fc 35a9282 e17f2fc 35a9282 248cac4 e17f2fc 35a9282 248cac4 44859d3 248cac4 35a9282 248cac4 35a9282 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 |
# InfinateCodeGenerator - Ultimate Merged Edition (v1.0.1) - full script
"""
Consolidated, hardened, and production-ready version (patched call_model & retries).
"""
import os
import sys
import time
import json
import traceback
import uuid
import re
import subprocess
import shutil
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple, Generator
from datetime import datetime
import gradio as gr
from huggingface_hub import InferenceClient
# Added missing imports
import tempfile
import zipfile
# ---------- Config ----------
PYTHON_MODEL = "Nxcode-CQ-7B-orpo"
OTHER_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct"
FALLBACK_MODELS = [
"Qwen/Qwen2.5-Coder-32B-Instruct",
"Nxcode-CQ-7B-orpo",
"OpenCodeInterpreter-DS-33B"
]
DEFAULT_TEMPERATURE = 0.5
DEFAULT_TOP_P = 0.9
DEFAULT_MAX_TOKENS = 4096
DEFAULT_MAX_ITERS = 5
COMMAND_TIMEOUT = 60 # Increased timeout for sandboxed execution
ERROR_LOG_FILE = "/tmp/infgen_error.log"
# Enhanced evaluation weights
EVAL_WEIGHTS = {
"style": 0.20,
"security": 0.20,
"tests": 0.40,
"maintainability": 0.20
}
# Setup structured logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(ERROR_LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
# ---------- Helpers ----------
def sanitize_log_message(message: str) -> str:
"""Redacts sensitive information like API tokens from logs."""
# Regex for typical Hugging Face tokens (hf_...) and other patterns
token_pattern = re.compile(r"hf_[a-zA-Z0-9]{30,}")
return token_pattern.sub("[REDACTED_TOKEN]", message)
def write_error_log(exc: Exception, prefix: str = ""):
"""Writes a sanitized error log."""
tb = traceback.format_exc()
sanitized_tb = sanitize_log_message(tb)
sanitized_exc = sanitize_log_message(str(exc))
logging.error(f"{prefix} | Exception: {sanitized_exc}\nTraceback:\n{sanitized_tb}")
def get_token_from_env_or_manual(manual_token: Optional[str]) -> Optional[str]:
"""Retrieves HF token securely from manual input or environment variables."""
if manual_token and manual_token.strip():
return manual_token.strip()
return os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
def detect_language(goal: str, code: str) -> bool:
"""Detects whether the project is primarily Python.
Returns True for Python, False otherwise.
"""
combined = (goal + " " + (code or "")).lower()
python_kw = ["python", "django", "flask", "fastapi", "pytest", "def ", "import ", "pip"]
return any(kw in combined for kw in python_kw)
def run_cmd(cmd: List[str], cwd: Optional[str] = None, timeout: int = COMMAND_TIMEOUT) -> Tuple[int, str]:
"""Runs a command in a subprocess with a timeout and captures output."""
try:
# Sandboxed execution: disable network for pytest
env = os.environ.copy()
# If command references pytest explicitly, disable network for safety
if any("pytest" in str(part) for part in cmd):
env["ALLOW_NETWORK"] = "0"
proc = subprocess.run(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=timeout,
text=True,
check=False,
env=env
)
return proc.returncode, proc.stdout or ""
except subprocess.TimeoutExpired:
return 1, f"TIMEOUT: Command '{' '.join(cmd)}' exceeded {timeout} seconds."
except FileNotFoundError:
return 1, f"COMMAND NOT FOUND: {cmd[0]}"
except Exception as e:
write_error_log(e, "run_cmd failed")
return 1, f"ERROR: {e}"
def write_files(workdir: Path, files: Dict[str, str]) -> None:
"""Safely writes a dictionary of files to a specified directory, preventing path traversal."""
workdir_resolved = workdir.resolve()
for filename, content in files.items():
try:
# Prevent path traversal attacks
if ".." in filename or filename.startswith("/"):
logging.warning(f"Blocked malicious path attempt: {filename}")
continue
target_path = (workdir_resolved / filename).resolve()
# Final check to ensure the path is within the workdir
if workdir_resolved not in target_path.parents and target_path != workdir_resolved:
raise ValueError(f"Path traversal attempt blocked: {filename}")
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(content, encoding="utf-8")
except Exception as e:
write_error_log(e, f"Failed to write file {filename}")
def make_zip(dirpath: Path) -> Optional[str]:
"""Creates a zip archive of a directory."""
try:
base_path = str(dirpath.parent / dirpath.name)
return shutil.make_archive(base_path, 'zip', root_dir=dirpath)
except Exception as e:
write_error_log(e, "ZIP creation failed")
return None
# ---------- Model calls ----------
def extract_chunk_content(chunk: Any) -> Optional[str]:
"""Extracts content from various possible streaming chunk formats."""
try:
if isinstance(chunk, dict) and (choices := chunk.get("choices")):
# typical OpenAI-like streaming chunk shape
delta = choices[0].get("delta", {})
return delta.get("content") or delta.get("text")
# HF newer shapes may use 'generations' inside chunk
if isinstance(chunk, dict) and "generations" in chunk:
gens = chunk.get("generations") or []
parts = []
for g in gens:
if isinstance(g, dict) and "text" in g:
parts.append(g["text"])
return "".join(parts) if parts else None
# some streaming yields objects with .delta or .content attributes
if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'):
return chunk.delta.content
if isinstance(chunk, str):
return chunk
except Exception:
return None
return None
def call_model(client: InferenceClient, system: str, user: str, is_python: bool, **settings) -> str:
"""Calls the appropriate LLM with retry logic and multiple fallbacks.
Tries non-streaming first (more reliable), falls back to streaming.
"""
if client is None:
return "<<ERROR: No inference client provided>>"
primary_model = PYTHON_MODEL if is_python else OTHER_MODEL
models_to_try = [primary_model] + [m for m in FALLBACK_MODELS if m != primary_model]
logging.info(f"Calling model for {'Python' if is_python else 'Other'} project. Primary: {primary_model}")
logging.debug(f"Raw settings: {settings}")
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
# Build robust settings: include both keys some API variants accept
cleaned = {}
cleaned["temperature"] = settings.get("temperature", DEFAULT_TEMPERATURE)
cleaned["top_p"] = settings.get("top_p", DEFAULT_TOP_P)
max_new = settings.get("max_new_tokens", settings.get("max_tokens", DEFAULT_MAX_TOKENS))
try:
max_new = int(max_new)
except Exception:
max_new = DEFAULT_MAX_TOKENS
cleaned["max_new_tokens"] = max_new
# also include max_tokens for API variants
cleaned["max_tokens"] = max_new
logging.info(f"Using cleaned settings: temperature={cleaned['temperature']}, top_p={cleaned['top_p']}, max_new_tokens={cleaned['max_new_tokens']}")
last_exception = None
for model_name in models_to_try:
attempt = 0
# try a couple of times per model with decreasing tokens if necessary
while attempt < 3:
attempt += 1
try:
logging.info(f"Attempting non-streaming call to {model_name} (attempt {attempt})")
# Try named-argument style first (most robust)
try:
resp = client.chat_completion(messages=messages, model=model_name, stream=False, **cleaned)
except TypeError as te:
# Some client versions expect different parameter names - try a second shape
logging.debug(f"TypeError calling chat_completion: {te}")
try:
resp = client.chat_completion(messages=messages, model=model_name, **cleaned)
except Exception as e:
raise
except Exception as e:
# bubble up to outer exception handling
raise
response_text = ""
# Parse many possible shapes
try:
if isinstance(resp, dict):
# common HF shapes
if "generated_text" in resp and isinstance(resp["generated_text"], str):
response_text = resp["generated_text"]
elif "text" in resp and isinstance(resp["text"], str):
response_text = resp["text"]
elif "choices" in resp and resp["choices"]:
choice = resp["choices"][0]
if isinstance(choice, dict):
if "message" in choice and isinstance(choice["message"], dict):
response_text = choice["message"].get("content") or choice["message"].get("text", "") or ""
else:
response_text = choice.get("text") or choice.get("message") or ""
else:
response_text = str(choice)
elif "generations" in resp and resp["generations"]:
gens = resp["generations"]
parts = []
for g in gens:
if isinstance(g, dict) and "text" in g:
parts.append(g.get("text", ""))
elif hasattr(g, "text"):
parts.append(getattr(g, "text"))
response_text = "".join(parts)
else:
# fallback: inspect nested keys
if "data" in resp and isinstance(resp["data"], list) and resp["data"]:
# e.g., {'data':[{'text': '...'}]}
first = resp["data"][0]
if isinstance(first, dict) and "text" in first:
response_text = first["text"]
elif isinstance(resp, (list, tuple)):
# maybe list of generation dicts
parts = []
for item in resp:
if isinstance(item, dict) and "text" in item:
parts.append(item["text"])
else:
parts.append(str(item))
response_text = "".join(parts)
elif isinstance(resp, str):
response_text = resp
else:
# last resort: str()
response_text = str(resp)
except Exception as e:
write_error_log(e, f"Non-stream parsing failed for model {model_name}")
response_text = ""
if response_text and response_text.strip():
logging.info(f"β Successfully got response from {model_name} ({len(response_text)} chars)")
return response_text
else:
logging.warning(f"Non-streaming returned empty response from {model_name}, attempt {attempt}.")
# fall through to streaming fallback below
except Exception as e:
last_exception = e
write_error_log(e, f"Non-stream model {model_name} failed on attempt {attempt}")
logging.error(f"Non-stream error for {model_name}: {str(e)[:200]}")
# Streaming fallback
try:
logging.info(f"Attempting streaming call to {model_name} (attempt {attempt})")
# streaming - some versions yield objects, some strings
try:
stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True, **cleaned)
except TypeError:
# Try alternate call-signature
stream_iter = client.chat_completion(messages=messages, model=model_name, stream=True)
except Exception as e:
raise
collected = []
try:
for chunk in stream_iter:
piece = extract_chunk_content(chunk)
if piece:
collected.append(piece)
response = "".join(collected).strip()
except Exception as e:
# some streaming iterables need to be exhausted differently; safely cast to string
write_error_log(e, "Streaming parsing failed")
response = ""
if response:
logging.info(f"β Successfully got streaming response from {model_name} ({len(response)} chars)")
return response
else:
logging.warning(f"Streaming returned empty response from {model_name} (attempt {attempt})")
except Exception as e:
last_exception = e
write_error_log(e, f"Streaming model {model_name} failed on attempt {attempt}")
logging.error(f"Streaming error for {model_name}: {str(e)[:200]}")
# reduce tokens and retry
time.sleep(1 + attempt * 0.5)
# reduce token budget to try avoid model refusing or failing
cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.5))
cleaned["max_tokens"] = cleaned["max_new_tokens"]
logging.info(f"Reduced max_new_tokens to {cleaned['max_new_tokens']} and retrying")
continue
# if reached here (no response), reduce tokens and retry
cleaned["max_new_tokens"] = max(256, int(cleaned["max_new_tokens"] * 0.6))
cleaned["max_tokens"] = cleaned["max_new_tokens"]
logging.info(f"No response; reduced max_new_tokens to {cleaned['max_new_tokens']} and will retry (attempt {attempt})")
time.sleep(0.8 + attempt * 0.3)
logging.error(f"β ALL MODELS FAILED. Last error: {last_exception}")
return f"<<ERROR: All models failed. Last error: {sanitize_log_message(str(last_exception))}>>"
# ---------- Robust parsing ----------
def validate_files_dict(files: Dict[str, str]) -> bool:
"""Validates that the generated files dictionary is well-formed."""
if not isinstance(files, dict) or not files:
return False
return all(isinstance(k, str) and isinstance(v, str) for k, v in files.items())
def parse_meta(text: str) -> Optional[Dict[str, Any]]:
"""Parses model output to extract code files, trying structured JSON first, then falling back to heuristics."""
if not text or not isinstance(text, str):
return None
# Strict JSON/META block parsing
for pattern in [r"```json\s*(.*?)```", r"```meta\s*(.*?)```", r"```META\s*(.*?)```", r"<META>(.*?)</META>"]:
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
try:
content = match.group(1).strip()
parsed = json.loads(content)
if "files" in parsed and validate_files_dict(parsed["files"]):
logging.info(f"Successfully parsed META JSON with {len(parsed['files'])} files")
return parsed
except (json.JSONDecodeError, TypeError) as e:
logging.warning(f"JSON parse failed: {e}")
continue
# Also try to detect a top-level JSON blob
try:
parsed_full = json.loads(text.strip())
if isinstance(parsed_full, dict) and "files" in parsed_full and validate_files_dict(parsed_full["files"]):
logging.info("Parsed raw JSON response as META")
return parsed_full
except Exception:
pass
# Fallback to heuristic parsing of code blocks
files = {}
# Try to find filename markers before code blocks
filename_patterns = [
r'#\s*[Ff]ile:\s*([\w/._-]+\.[\w]+)',
r'##\s*([\w/._-]+\.[\w]+)',
r'\*\*\s*([\w/._-]+\.[\w]+)\s*\*\*',
r'^\s*([\w\-/_.]+?\.(?:py|txt|md|json|yaml|yml))\s*:\s*$', # e.g., "main.py:" on its own line
]
all_filenames = []
for pattern in filename_patterns:
all_filenames.extend(re.findall(pattern, text, flags=re.MULTILINE))
# Grab all fenced code blocks
code_blocks = re.findall(r"```(?:[\w+-]+)?\s*([\s\S]*?)```", text, re.DOTALL)
# Also capture indented/code-block-like sections (fallback)
if not code_blocks:
# naive: split by two or more newlines and keep blocks that look like code
chunks = [c for c in re.split(r"\n{2,}", text) if len(c.splitlines()) > 1]
code_blocks = chunks[:6] # limit
if not code_blocks:
logging.warning("No code blocks found in model response")
return None
# Match filenames with code blocks
for i, block in enumerate(code_blocks):
block_content = block.strip()
if not block_content:
continue
if i < len(all_filenames):
filename = all_filenames[i]
else:
# Guess filename based on content
if "def test_" in block_content or "import pytest" in block_content:
filename = f"tests/test_main.py" if not block_content.startswith("test_") else f"{block_content.splitlines()[0][:50]}.py"
elif "requirements" in text.lower() and i == 0:
filename = "requirements.txt"
elif "# README" in block_content or block_content.startswith("# ") or block_content.lower().strip().startswith("readme"):
filename = "README.md"
else:
filename = f"main.py" if i == 0 else f"file_{i}.py"
# ensure relative path safe
files[filename] = block_content
if validate_files_dict(files) and files:
logging.info(f"Heuristic parsing extracted {len(files)} files: {list(files.keys())}")
return {"files": files, "changelog": "Extracted files via heuristic parsing."}
# As a last resort, if the whole output looks like a single file, place it into main.py
if text.strip():
files = {"main.py": text.strip()}
if validate_files_dict(files):
logging.info("Parsed whole response into main.py as last resort")
return {"files": files, "changelog": "Fallback single-file parse."}
logging.error("Failed to extract any valid files from model response")
return None
# ---------- Enhanced evaluators ----------
def run_evaluators(workdir: Path) -> Dict[str, Any]:
out = {}
rc, txt = run_cmd([sys.executable, "-m", "flake8", ".", "--count", "--max-line-length=100"], cwd=str(workdir))
out["flake8_pass"] = rc == 0
out["flake8_out"] = txt
rc, txt = run_cmd([sys.executable, "-m", "bandit", "-r", ".", "-f", "txt"], cwd=str(workdir))
out["bandit_pass"] = rc == 0 or "No issues" in txt
out["bandit_out"] = txt
test_files = list(workdir.glob("**/test_*.py")) + list(workdir.glob("**/*_test.py"))
if test_files:
rc, txt = run_cmd([sys.executable, "-m", "pytest", "--maxfail=1", "--tb=short"], cwd=str(workdir))
out["pytest_pass"] = rc == 0
else:
out["pytest_pass"] = False
out["pytest_out"] = txt if test_files else "No tests"
rc, txt = run_cmd([sys.executable, "-m", "black", "--check", "."], cwd=str(workdir))
out["black_pass"] = rc == 0
complexity = 5.0
rc, txt = run_cmd([sys.executable, "-m", "radon", "cc", ".", "-s", "-a"], cwd=str(workdir))
if rc == 0:
m = re.search(r"Average complexity.*?([0-9.]+)", txt)
if m:
try:
complexity = float(m.group(1))
except:
pass
out["complexity"] = complexity
# Calculate weighted score
style = 100.0 if (out["flake8_pass"] and out["black_pass"]) else 50.0
security = 100.0 if out["bandit_pass"] else 30.0
tests = 100.0 if out["pytest_pass"] else 20.0
maintainability = max(0.0, 100.0 - (complexity - 5.0) * 10.0) if complexity > 5 else 100.0
w = EVAL_WEIGHTS
score = w["style"] * style + w["security"] * security + w["tests"] * tests + w["maintainability"] * maintainability
out["quality_score"] = round(max(0.0, min(100.0, score)), 1)
out["breakdown"] = {
"style": round(style, 1),
"security": round(security, 1),
"tests": round(tests, 1),
"maintainability": round(maintainability, 1)
}
return out
# ---------- AI features ----------
def generate_code_review(client: Optional[InferenceClient], token: str, files: Dict[str, str], eval_results: Dict, is_python: bool) -> str:
preview = "\n".join([f"# {n}\n{c[:300]}..." for n, c in list(files.items())[:2]])
prompt = f"""Review this code:
{preview}
Quality: Flake8={'Pass' if eval_results.get('flake8_pass') else 'Fail'}, Tests={'Pass' if eval_results.get('pytest_pass') else 'Fail'}
Give 2-3 specific, actionable improvements:"""
review = call_model(client, "You are a senior code reviewer.", prompt, is_python, max_new_tokens=400, temperature=0.2, top_p=0.8)
return review if review and "<<ERROR" not in review else "No review"
def generate_readme(client: Optional[InferenceClient], token: str, goal: str, files: Dict[str, str], is_python: bool) -> str:
summary = "\n".join([f"- {n}: {len(c.splitlines())} lines" for n, c in files.items()])
prompt = f"""Create README.md:
Goal: {goal}
Files:
{summary}
Include: description, installation, usage."""
readme = call_model(client, "You are a technical writer.", prompt, is_python, max_new_tokens=600, temperature=0.2, top_p=0.9)
return readme if readme and "<<ERROR" not in readme else "# Project\n\nGenerated code."
def create_initial_scaffold(client: Optional[InferenceClient], token: str, goal: str, is_python: bool) -> Optional[Dict[str, Any]]:
system = """You are a Principal Software Architect. Create a professional initial project scaffold with working code, requirements.txt, and tests."""
prompt = f"""Project: {goal}
Create Version 0.1 scaffold:
1. Choose appropriate libraries (requirements.txt)
2. Working minimal code
3. Basic tests in tests/
Return as META JSON with files mapping."""
try:
response = call_model(client, system, prompt, is_python, max_new_tokens=3072, temperature=0.4)
if response and "<<ERROR" not in response:
meta = parse_meta(response)
if meta and meta.get("files") and validate_files_dict(meta["files"]):
return meta
else:
# Save raw scaffold response for debugging
with open("/tmp/failed_scaffold_response.txt", "w") as f:
f.write(response)
except Exception as e:
write_error_log(e, "Scaffold failed")
return None
def import_project(zip_file) -> Dict[str, str]:
if not zip_file:
return {}
try:
files = {}
with zipfile.ZipFile(zip_file.name, 'r') as zf:
for filename in zf.namelist():
if filename.endswith(('.py', '.txt', '.md', '.json', '.yaml', '.yml')):
try:
content = zf.read(filename).decode('utf-8')
files[filename] = content
except:
pass
return files
except Exception as e:
write_error_log(e, "Import failed")
return {}
# ---------- Controller ----------
class CodeGenController:
def __init__(self, token: str, goal: str, instructions: str, settings: Dict, max_iters: int, infinite_mode: bool, is_python: bool):
self.token = token
try:
self.client = InferenceClient(token=token)
logging.info("β InferenceClient initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize InferenceClient: {e}")
raise
self.goal = goal
self.instructions = instructions
self.settings = settings
self.max_iters = max_iters
self.infinite_mode = infinite_mode
self.is_python = is_python
self.model_name = PYTHON_MODEL if is_python else OTHER_MODEL
logging.info(f"Controller initialized for {'Python' if is_python else 'Other'} with model: {self.model_name}")
self.history: List[Dict] = []
self.current_files: Dict[str, str] = {}
self.current_code: str = ""
self.best_score: float = 0.0
self.best_eval: Dict = {}
self.best_files: Dict[str, str] = {}
self.best_workspace: str = ""
self.best_zip: Optional[str] = None
self.best_review: str = ""
self.best_readme: str = ""
self.stop_flag = Path(tempfile.gettempdir()) / f"stop_{uuid.uuid4().hex[:8]}"
def cleanup_workdir(self, workdir: Path):
try:
if workdir.exists():
shutil.rmtree(workdir)
except Exception as e:
write_error_log(e, f"Failed to cleanup workdir {workdir}")
def start_scaffolding(self) -> bool:
scaffold = create_initial_scaffold(self.client, self.token, self.goal, self.is_python)
if scaffold and scaffold.get("files"):
self.current_files = scaffold["files"]
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
self.best_files = dict(self.current_files)
# Ensure we have at least requirements.txt
if "requirements.txt" not in self.best_files:
self.best_files["requirements.txt"] = "# Add your requirements here"
return True
# Better defaults if scaffolding fails
self.current_files = {
"main.py": "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()",
"requirements.txt": "# Add your requirements here"
}
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
self.best_files = dict(self.current_files)
return False
def perform_iteration(self, iteration: int) -> Dict[str, Any]:
parent = Path(tempfile.mkdtemp(prefix="infgen_"))
workdir = parent / f"iter_{iteration}_{uuid.uuid4().hex[:6]}"
workdir.mkdir(parents=True, exist_ok=True)
try:
system = """You are a Level 5 Principal Software Engineer specializing in production-ready code.
Follow Defensive Programming, TDD, and best practices.
Output MUST be flawless, well-tested, and industry-standard.
ALWAYS return complete code in META JSON format."""
feedback = ""
if self.best_eval:
score = self.best_eval.get("quality_score", 0)
feedback = f"\n\nPREVIOUS SCORE: {score}/100"
if not self.best_eval.get("pytest_pass"):
feedback += "\nCRITICAL: TESTS FAILING. Fix logic to pass all tests."
if not self.best_eval.get("flake8_pass"):
feedback += "\nFIX: Code quality issues (flake8)."
prompt = f"""PROJECT: {self.goal}
CURRENT CODE (BUILD UPON THIS):
{self.current_code}
INSTRUCTIONS: {self.instructions}{feedback}
CRITICAL RULES:
1. Every function needs docstrings and type hints.
2. Write comprehensive pytest tests.
3. Complete implementations - NO placeholders.
4. Return all files in META JSON format.
Return the perfected code in META format."""
# Attempt the model call, with extra retry attempts and reduced token fallback
response = call_model(self.client, system, prompt, self.is_python, **self.settings)
if not response or "<<ERROR" in response:
logging.error(f"Model returned error or empty: {response[:200]}")
# Save response for debugging if available
with open(f"/tmp/failed_response_{iteration}.txt", "w") as f:
f.write(response or "<<EMPTY RESPONSE>>")
# Try one conservative retry with reduced token budget before failing
logging.info("Attempting a conservative retry with reduced tokens...")
conservative_settings = dict(self.settings)
conservative_settings["max_new_tokens"] = min(1024, int(conservative_settings.get("max_new_tokens", 1024)))
conservative_settings["temperature"] = min(0.3, float(conservative_settings.get("temperature", 0.3)))
response_retry = call_model(self.client, system, prompt, self.is_python, **conservative_settings)
if response_retry and "<<ERROR" not in response_retry:
response = response_retry
else:
# Still failed β produce a safe fallback scaffold so iteration does not fail
logging.warning(f"Conservative retry failed for iteration {iteration}; producing fallback scaffold.")
fallback_main = self.current_files.get("main.py", "# New project\n\ndef main():\n print('Hello, World!')\n\nif __name__ == '__main__':\n main()")
fallback_main += "\n\n# NOTE: Model failed to generate new code for this iteration. Fallback scaffold inserted."
fallback_test = (
"import pytest\n\n"
"def test_placeholder():\n"
" \"\"\"Placeholder test created because model failed to produce output.\"\"\"\n"
" assert True\n"
)
fallback_requirements = self.current_files.get("requirements.txt", "# Add your requirements here")
files = {
"main.py": fallback_main,
"tests/test_main.py": fallback_test,
"requirements.txt": fallback_requirements
}
# Write files and run evaluators so the pipeline can continue
write_files(workdir, files)
eval_results = run_evaluators(workdir)
eval_results["fallback_used"] = True
eval_results["fallback_info"] = f"Model failed; fallback scaffold used for iteration {iteration}."
review = f"Fallback scaffold inserted because model failed to return usable output. See /tmp/failed_response_{iteration}.txt for raw model response."
readme = f"# Fallback Project\n\nThis scaffold was inserted automatically because model generation failed on iteration {iteration}."
files["README.md"] = readme
write_files(workdir, {"README.md": readme})
zip_path = make_zip(workdir)
return {
"success": True,
"eval": eval_results,
"zip": zip_path,
"workdir": str(workdir),
"files": files,
"review": review,
"readme": readme
}
meta = parse_meta(response)
if not meta or not meta.get("files"):
logging.error(f"Parse failed. Response preview: {response[:1000]}")
# Save failed response for debugging
with open(f"/tmp/failed_response_{iteration}.txt", "w") as f:
f.write(response)
return {"success": False, "warning": f"Parse failed - see /tmp/failed_response_{iteration}.txt"}
files = meta["files"]
write_files(workdir, files)
eval_results = run_evaluators(workdir)
review = generate_code_review(self.client, self.token, files, eval_results, self.is_python)
readme = generate_readme(self.client, self.token, self.goal, files, self.is_python)
files["README.md"] = readme
write_files(workdir, {"README.md": readme})
zip_path = make_zip(workdir)
return {
"success": True, "eval": eval_results, "zip": zip_path, "workdir": str(workdir),
"files": files, "review": review, "readme": readme
}
except Exception as e:
write_error_log(e, "Iteration exception")
return {"success": False, "warning": f"Exception: {str(e)}"}
def run_loop(self) -> Generator:
iteration = 1
max_iterations = 999999 if self.infinite_mode else self.max_iters
if not self.current_files:
self.start_scaffolding()
initial_state = {"stop_flag_path": str(self.stop_flag)}
yield self.format_output(f"Starting with {self.model_name}...", iteration, max_iterations, initial_state)
while iteration <= max_iterations:
if self.stop_flag.exists():
try:
self.stop_flag.unlink()
logging.info("Stop flag detected - stopping generation")
except OSError:
pass
yield self.format_output("β Stopped by user", iteration, max_iterations)
break
yield self.format_output(f"π Iteration {iteration}/{max_iterations} running...", iteration, max_iterations)
result = self.perform_iteration(iteration)
if not result.get("success"):
warning_msg = result.get("warning", "Unknown iteration error")
logging.warning(f"Iteration {iteration} failed: {warning_msg}")
# CRITICAL: Still yield output so UI updates and shows the previous best code
yield self.format_output(f"β οΈ Iteration {iteration} failed: {warning_msg}", iteration, max_iterations)
time.sleep(2) # Give more time before retry
iteration += 1
continue
eval_res = result.get("eval", {})
score = eval_res.get("quality_score", 0)
self.history.append({"iteration": iteration, "eval": eval_res})
self.current_files = result["files"]
self.current_code = "\n\n".join(f"# {n}\n{c}" for n, c in self.current_files.items())
if score > self.best_score:
if self.best_workspace:
self.cleanup_workdir(Path(self.best_workspace))
self.best_score = score
self.best_eval = eval_res
self.best_files = dict(result["files"])
self.best_workspace = result.get("workdir", "")
self.best_zip = result.get("zip")
self.best_review = result.get("review", "")
self.best_readme = result.get("readme", "")
logging.info(f"New best score: {score}/100")
else:
# Even if score didn't improve, still update current_files for next iteration
logging.info(f"Score {score}/100 - keeping best: {self.best_score}/100")
if result.get("workdir") and result.get("workdir") != self.best_workspace:
self.cleanup_workdir(Path(result["workdir"]))
yield self.format_output(f"Iteration {iteration} complete: {score}/100", iteration, max_iterations)
iteration += 1
time.sleep(0.3)
yield self.format_output(f"Complete! Best: {self.best_score}/100", iteration - 1, max_iterations)
def format_output(self, log_msg: str, iteration: int, max_iters: int, state: Optional[Dict] = None):
progress = f"Iteration {iteration}/{max_iters if max_iters < 999999 else 'INF'}" if iteration <= max_iters else "Complete"
main = self.best_files.get("main.py", "# Generating code...")
test = next((v for k, v in self.best_files.items() if 'test' in k and k.endswith('.py')), "# No tests yet...")
req = self.best_files.get("requirements.txt", "# No requirements yet...")
readme = self.best_files.get("README.md", "# Generating README...")
other = {k: v for k, v in self.best_files.items() if k not in [
"main.py", next((k for k in self.best_files if 'test' in k and k.endswith('.py')), None),
"requirements.txt", "README.md"
]}
return (
f"[{time.strftime('%X')}] {sanitize_log_message(log_msg)}", self.model_name, progress,
generate_metrics_html(self.history), self.best_eval, main, test, req, readme, other,
self.best_review, self.best_zip, self.best_workspace, state or {}
)
# ---------- UI Helpers ----------
def generate_metrics_html(history: List[Dict]) -> str:
if not history:
return "<div style='padding:12px'>No metrics yet</div>"
html_parts = ["<div style='font-family:sans-serif'><h4>Quality Trend</h4><div style='background:#f8f9fa;padding:12px;border-radius:8px'>"]
for h in history[-10:]:
score = h.get("eval", {}).get("quality_score", 0)
width = int(score * 2.5)
color = "#10b981" if score >= 80 else "#f59e0b" if score >= 60 else "#ef4444"
html_parts.append(f"<div style='margin:4px 0'>#{h.get('iteration')}: <div style='display:inline-block;width:{width}px;height:20px;background:{color};border-radius:4px'></div> {score}/100</div>")
scores = [h.get("eval", {}).get("quality_score", 0) for h in history]
avg = sum(scores) / len(scores) if scores else 0
best = max(scores) if scores else 0
html_parts.append(f"<div style='margin-top:12px'><strong>Avg:</strong> {avg:.1f} | <strong>Best:</strong> {best:.1f}</div></div></div>")
return "".join(html_parts)
# ---------- UI ----------
def create_ui():
with gr.Blocks(title="InfinateCodeGenerator Ultimate", theme=gr.themes.Soft()) as demo:
gr.Markdown("# InfinateCodeGenerator - Ultimate Merged Edition\n*Controller architecture β’ Smart models β’ Multi-file UI β’ Never stops early*")
controller_state = gr.State({})
with gr.Row():
with gr.Column(scale=2):
project_goal = gr.Textbox(label="Project Goal", lines=4, placeholder="E.g., Create a FastAPI endpoint for user authentication with MongoDB")
with gr.Tabs():
with gr.TabItem("New Project"):
initial_code = gr.Textbox(label="Starting Code (optional)", lines=8)
with gr.TabItem("Import Project"):
import_zip = gr.File(label="Upload ZIP", file_types=[".zip"])
import_btn = gr.Button("Import Files")
import_status = gr.Textbox(label="Status", interactive=False)
improve_instructions = gr.Textbox(label="Instructions", lines=3, value="Write comprehensive tests, add type hints, and improve error handling.")
hf_token_manual = gr.Textbox(label="HF Token (optional, overrides env)", type="password")
with gr.Row():
start_btn = gr.Button("Start Generation", variant="primary", size="lg")
stop_btn = gr.Button("STOP", variant="stop", size="lg")
with gr.Accordion("Settings", open=False):
infinite_mode = gr.Checkbox(label="Infinite Mode", value=False)
max_iters = gr.Slider(1, 15, value=5, step=1, label="Max Iterations")
temperature = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Temperature")
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top P")
max_tokens = gr.Slider(512, 4096, value=4096, step=512, label="Max Tokens")
with gr.Column(scale=3):
with gr.Tabs():
with gr.TabItem("Dashboard"):
model_display = gr.Textbox(label="Active Model", interactive=False)
progress_display = gr.Textbox(label="Progress", interactive=False)
metrics_html = gr.HTML()
run_log = gr.Textbox(label="Log", lines=10, interactive=False)
with gr.TabItem("Generated Files"):
with gr.Tabs():
with gr.TabItem("main.py"):
main_file = gr.Code(language="python", lines=20)
with gr.TabItem("tests/test_main.py"):
test_file = gr.Code(language="python", lines=15)
with gr.TabItem("requirements.txt"):
req_file = gr.Textbox(lines=10, interactive=False, show_label=False)
with gr.TabItem("README.md"):
readme_file = gr.Code(language="markdown", lines=15)
with gr.TabItem("Other Files"):
other_files = gr.JSON()
with gr.TabItem("Code Review"):
review_display = gr.Textbox(label="AI Review", lines=10, interactive=False)
with gr.TabItem("Evaluation"):
eval_json = gr.JSON()
with gr.TabItem("Download"):
workspace_path = gr.Textbox(label="Best Version Path", interactive=False)
download_zip = gr.File(label="Download ZIP")
def import_project_files(zip_file):
files = import_project(zip_file)
if files:
combined = "\n\n".join([f"# {name}\n{content}" for name, content in files.items()])
return f"Imported {len(files)} files!", combined
return "Import failed", ""
import_btn.click(fn=import_project_files, inputs=[import_zip], outputs=[import_status, initial_code])
def start_gen(goal, init_code, instructions, hf_tok, inf_mode, max_it, temp, top, max_tok):
token = get_token_from_env_or_manual(hf_tok)
if not token:
error_msg = "ERROR: No HF token found. Please provide a Hugging Face token."
logging.error(error_msg)
yield (error_msg, "", "", "", {}, "", "", "", "", {}, "", None, "", {})
return
logging.info(f"Starting generation with token: {token[:10]}... (length: {len(token)})")
settings = {"temperature": temp, "top_p": top, "max_new_tokens": max_tok}
is_python_project = detect_language(goal, init_code)
logging.info(f"Detected project type: {'Python' if is_python_project else 'Other'}")
controller = CodeGenController(token, goal, instructions, settings, int(max_it), inf_mode, is_python_project)
if init_code and init_code.strip():
controller.current_files = {"main.py": init_code}
controller.current_code = init_code
logging.info("Using provided initial code")
yield from controller.run_loop()
def set_stop(controller_state_val):
if controller_state_val and (stop_path_str := controller_state_val.get("stop_flag_path")):
stop_path = Path(stop_path_str)
try:
stop_path.touch()
logging.info(f"Stop flag created at {stop_path}")
return "β Stop signal sent! Will stop after current iteration completes..."
except Exception as e:
logging.error(f"Failed to create stop flag: {e}")
return f"β Failed to stop: {e}"
return "β Could not stop: No active process found. Start generation first."
outputs = [
run_log, model_display, progress_display, metrics_html, eval_json,
main_file, test_file, req_file, readme_file, other_files,
review_display, download_zip, workspace_path, controller_state
]
start_btn.click(
fn=start_gen,
inputs=[project_goal, initial_code, improve_instructions, hf_token_manual, infinite_mode, max_iters, temperature, top_p, max_tokens],
outputs=outputs
)
stop_btn.click(fn=set_stop, inputs=[controller_state], outputs=[run_log])
return demo
if __name__ == "__main__":
try:
demo = create_ui()
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
except Exception as e:
print(f"Failed to launch Gradio app: {e}", file=sys.stderr)
sys.exit(1)
|