拼消消生图管线升级:6 sheet 单形状满画布 + 洋红去背 + 自适应切图 + 提示词优化

改为 6 张 sheet,每张单形状,取消全部 FILL/留白,AI 填满画布后多画少取
新增洋红去背步骤,对接 platform-image alpha 管线
新增 find_non_transparent_bounds 四方向内容边界扫描
新增 fill_transparent_with_opaque_average 透明像素填充
自适应网格检测 (detect_cell_grid_seed) 用于组间边界对齐
重写 slice_puzzle_clear_sheet 为两阶段:group bbox → 等分 cell
提示词优化:主前缀改为裁片级描述,每 sheet 增加精确占格约束
修复 jump_hop 测试断言 (1×1×1 → 1×1×1 的立方体)
新增分析脚本 tools/analyze_puzzle_clear_output.py 和 tools/test_ve_api.py
Sheet-06 为纵向 1×3 缓冲区
This commit is contained in:
2026-06-12 22:08:57 +08:00
parent 21a8ff690a
commit 5795115c20
4 changed files with 684 additions and 142 deletions

View File

@@ -0,0 +1,204 @@
"""
分析拼消消测试输出,检测卡片透明区域并溯源问题成因。
用法:
python tools/analyze_puzzle_clear_output.py
python tools/analyze_puzzle_clear_output.py --dir path/to/output
python tools/analyze_puzzle_clear_output.py --detail # 详细逐卡输出
"""
import os
import sys
import argparse
from collections import defaultdict
from PIL import Image
def scan_transparent_pixels(img_path):
"""扫描图片,返回 (总像素数, 透明像素数, 边缘透明列比例)"""
img = Image.open(img_path).convert("RGBA")
w, h = img.size
pixels = img.load()
total = w * h
transparent = 0
edge_cols_with_transparent = 0
edge_rows_with_transparent = 0
# 统计透明像素
for y in range(h):
for x in range(w):
if pixels[x, y][3] < 128:
transparent += 1
# 检测四边是否有透明像素(边缘列/行透明占比 > 10%
for x in range(w):
col_transparent = sum(1 for y in range(h) if pixels[x, y][3] < 128)
if col_transparent > h * 0.1:
edge_cols_with_transparent += 1
for y in range(h):
row_transparent = sum(1 for x in range(w) if pixels[x, y][3] < 128)
if row_transparent > w * 0.1:
edge_rows_with_transparent += 1
ratio = transparent / total * 100 if total > 0 else 0
has_edge = edge_cols_with_transparent > 0 or edge_rows_with_transparent > 0
return total, transparent, ratio, has_edge, edge_cols_with_transparent, edge_rows_with_transparent
def analyze_sheet_cleaned(sheet_path):
"""分析去背后的 sheet 图,检查各 group 区域的透明情况"""
img = Image.open(sheet_path).convert("RGBA")
w, h = img.size
pixels = img.load()
# 统计整体透明像素
total = w * h
transparent = sum(1 for y in range(h) for x in range(w) if pixels[x, y][3] < 128)
return total, transparent, transparent / total * 100 if total > 0 else 0
def main():
parser = argparse.ArgumentParser(description="分析拼消消测试输出")
parser.add_argument("--dir", default="", help="输出目录路径")
parser.add_argument("--detail", action="store_true", help="详细逐卡输出")
args = parser.parse_args()
# 自动查找输出目录
if args.dir:
base = args.dir
else:
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
candidates = [
os.path.join(repo_root, "server-rs", "crates", "api-server", "target", "test-output", "puzzle-clear-real"),
os.path.join(repo_root, "server-rs", "target", "test-output", "puzzle-clear-real"),
]
base = None
for c in candidates:
if os.path.isdir(c):
base = c
break
if not base:
print("未找到测试输出目录。请用 --dir 指定路径。")
sys.exit(1)
sheets_dir = os.path.join(base, "sheets")
cards_dir = os.path.join(base, "cards")
if not os.path.isdir(cards_dir):
print(f"cards 目录不存在: {cards_dir}")
sys.exit(1)
# ==================== 阶段 1: 分析卡片 ====================
print("=" * 70)
print("阶段 1: 卡片透明像素分析")
print("=" * 70)
card_results = [] # (sheet, card_name, total, transparent, ratio, has_edge, edge_cols, edge_rows)
problem_cards = []
for sheet_name in sorted(os.listdir(cards_dir)):
sheet_dir = os.path.join(cards_dir, sheet_name)
if not os.path.isdir(sheet_dir):
continue
for card_name in sorted(os.listdir(sheet_dir)):
if not card_name.endswith(".png"):
continue
card_path = os.path.join(sheet_dir, card_name)
total, trans, ratio, has_edge, ec, er = scan_transparent_pixels(card_path)
card_results.append((sheet_name, card_name, total, trans, ratio, has_edge, ec, er))
if ratio > 5 or has_edge:
problem_cards.append((sheet_name, card_name, total, trans, ratio, has_edge, ec, er))
# 按 sheet 汇总
by_sheet = defaultdict(list)
for r in card_results:
by_sheet[r[0]].append(r)
print(f"\n总卡片数: {len(card_results)}")
print(f"问题卡片数 (透明>5% 或 有边缘透明): {len(problem_cards)}")
print()
for sheet_name in sorted(by_sheet.keys()):
cards = by_sheet[sheet_name]
problem_count = sum(1 for r in cards if r[4] > 5 or r[5])
print(f" {sheet_name}: {len(cards)} cards, {problem_count} problems")
if problem_cards:
print(f"\n--- 问题卡片详情 ---")
problem_cards.sort(key=lambda r: -r[4]) # sort by ratio desc
for sheet, name, total, trans, ratio, has_edge, ec, er in problem_cards:
group_id = name.split("-part-")[0]
edge_info = f", 边缘透明列={ec} 行={er}" if has_edge else ""
print(f" {sheet}/{name} group={group_id} transparent={ratio:.1f}% ({trans}/{total}){edge_info}")
# ==================== 阶段 2: 溯源分析 ====================
print()
print("=" * 70)
print("阶段 2: 溯源 — 对比原始 sheet 与去背后 sheet")
print("=" * 70)
if os.path.isdir(sheets_dir):
for fname in sorted(os.listdir(sheets_dir)):
if not fname.endswith(".png"):
continue
sheet_path = os.path.join(sheets_dir, fname)
total, trans, ratio = analyze_sheet_cleaned(sheet_path)
is_cleaned = "-cleaned" in fname
label = "去背后" if is_cleaned else "原始"
print(f" {fname} ({label}): {trans}/{total} 透明像素 ({ratio:.1f}%)")
# ==================== 阶段 3: 问题溯源推理 ====================
print()
print("=" * 70)
print("阶段 3: 问题成因分析")
print("=" * 70)
if not problem_cards:
print(" 无问题卡片,管线正常。")
return
# 分析问题卡片的 group 分布
problem_groups = defaultdict(list)
for r in problem_cards:
group_id = r[1].split("-part-")[0]
problem_groups[group_id].append(r)
print(f"\n 涉及 {len(problem_groups)} 个 group:")
for group_id in sorted(problem_groups.keys()):
cards = problem_groups[group_id]
avg_ratio = sum(r[4] for r in cards) / len(cards)
edge_count = sum(1 for r in cards if r[5])
print(f" {group_id}: {len(cards)} cells, avg透明={avg_ratio:.1f}%, {edge_count} cells有边缘透明")
# 检查原始 sheet 和去背后 sheet 的差异
if os.path.isdir(sheets_dir):
cleaned_files = [f for f in os.listdir(sheets_dir) if "cleaned" in f]
raw_files = [f for f in os.listdir(sheets_dir) if "cleaned" not in f and f.endswith(".png")]
if cleaned_files and raw_files:
for raw_f in sorted(raw_files):
raw_p = os.path.join(sheets_dir, raw_f)
cleaned_f = raw_f.replace(".png", "-cleaned.png")
cleaned_p = os.path.join(sheets_dir, cleaned_f)
if not os.path.exists(cleaned_p):
continue
_, raw_trans, raw_ratio = analyze_sheet_cleaned(raw_p)
_, cleaned_trans, cleaned_ratio = analyze_sheet_cleaned(cleaned_p)
print(f"\n {raw_f}:")
print(f" 原始透明: {raw_ratio:.1f}%")
print(f" 去后透明: {cleaned_ratio:.1f}%")
delta = cleaned_ratio - raw_ratio
if delta > 1:
print(f" ** 去背增加了 {delta:.1f}% 透明像素 — 可能误删了主体内容")
print()
print(" 可能原因:")
print(" 1. AI 未将内容画满整个 group 区域(内容在组内偏移)")
print(" 2. 洋红去背误将主题内近似洋红的像素也变透明")
print(" 3. find_non_transparent_bounds 扫描范围包括了相邻 group 的透明间隙")
print(" 4. group resize 后未完全覆盖目标尺寸(内容比例与目标比例不匹配)")
if __name__ == "__main__":
main()

171
tools/test_ve_api.py Normal file
View File

@@ -0,0 +1,171 @@
"""
Vector Engine API 连通性与生图耗时测试脚本。
用法:
python tools/test_ve_api.py
python tools/test_ve_api.py --prompt "你的自定义提示词"
python tools/test_ve_api.py --size 1024x1024 --samples 3
前置条件:
环境变量 VECTOR_ENGINE_API_KEY 已设置,
或从 ../.env.secrets.local 自动读取。
"""
import os
import sys
import time
import json
import argparse
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
API_URL = "https://api.vectorengine.cn/v1/images/generations"
DEFAULT_PROMPT = "生成一张白色背景上的一只飞踢橘猫,绘本风格,不要文字水印"
DEFAULT_SIZE = "1024x1536"
DEFAULT_NEGATIVE = "文字、Logo、水印、按钮、UI、网格线、边框、编号、标签、纯色背景、白底、孤立主体"
def load_env_from_file(filepath):
"""从 .env 文件中加载环境变量(简单实现)"""
if not os.path.exists(filepath):
return
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and value and key not in os.environ:
os.environ[key] = value
def single_request(api_key, base_url, prompt, negative, size, quality, index):
"""单次生图请求,返回 (耗时秒, task_id, 图片字节数)"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
body = {
"model": "gpt-image-2",
"prompt": prompt,
"n": 1,
"size": size,
}
if negative:
body["negative_prompt"] = negative
if quality:
body["quality"] = quality
start = time.time()
try:
resp = requests.post(
base_url.rstrip("/") + "/v1/images/generations",
headers=headers,
json=body,
timeout=600,
)
elapsed = time.time() - start
if resp.status_code != 200:
print(f" [#{index}] HTTP {resp.status_code}: {resp.text[:300]}")
return elapsed, None, 0
data = resp.json()
task_id = data.get("task_id", "")
images = data.get("data", [])
b64_len = len(images[0].get("b64_json", "")) if images else 0
url = images[0].get("url", "") if images else ""
print(f" [#{index}] {elapsed:.1f}s task_id={task_id} b64={b64_len}chars url={'present' if url else 'none'}")
return elapsed, task_id, b64_len
except requests.Timeout:
elapsed = time.time() - start
print(f" [#{index}] TIMEOUT after {elapsed:.0f}s")
return elapsed, None, 0
except Exception as e:
elapsed = time.time() - start
print(f" [#{index}] ERROR: {e}")
return elapsed, None, 0
def main():
parser = argparse.ArgumentParser(description="Vector Engine API 测试")
parser.add_argument("--prompt", default=DEFAULT_PROMPT, help="生图提示词")
parser.add_argument("--negative", default=DEFAULT_NEGATIVE, help="负面提示词")
parser.add_argument("--size", default=DEFAULT_SIZE, help="图片尺寸 (1024x1024 / 1024x1536 / 1536x1024)")
parser.add_argument("--samples", type=int, default=1, help="请求次数")
parser.add_argument("--parallel", type=int, default=1, help="并行请求数 (默认1=串行)")
parser.add_argument("--quality", default="", help="生图质量 (low/medium/high)")
parser.add_argument("--base-url", default=API_URL, help="API 地址")
args = parser.parse_args()
# 自动加载 secrets
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
for fname in [".env.secrets.local", ".env.local", ".env"]:
load_env_from_file(os.path.join(repo_root, fname))
api_key = os.environ.get("VECTOR_ENGINE_API_KEY", "")
if not api_key:
print("错误: 未设置 VECTOR_ENGINE_API_KEY")
print("请设置环境变量或将密钥写入 .env.secrets.local")
sys.exit(1)
base_url = os.environ.get("VECTOR_ENGINE_BASE_URL", args.base_url)
print(f"API: {base_url}")
print(f"Size: {args.size}")
print(f"Samples: {args.samples}")
print(f"Parallel: {args.parallel}")
if args.quality:
print(f"Quality: {args.quality}")
print(f"Prompt ({len(args.prompt)} chars):")
print(f" {args.prompt[:120]}...")
print(f"Negative ({len(args.negative)} chars):")
print(f" {args.negative[:120]}...")
print()
parallel = args.parallel
total_start = time.time()
if parallel <= 1:
times = []
for i in range(1, args.samples + 1):
elapsed, task_id, b64_len = single_request(
api_key, base_url, args.prompt, args.negative, args.size, args.quality, i
)
if b64_len > 0:
times.append(elapsed)
else:
times = []
with ThreadPoolExecutor(max_workers=parallel) as pool:
futures = {
pool.submit(
single_request,
api_key, base_url, args.prompt, args.negative, args.size, args.quality, idx
): idx
for idx in range(1, args.samples + 1)
}
for future in as_completed(futures):
elapsed, task_id, b64_len = future.result()
if b64_len > 0:
times.append(elapsed)
total_elapsed = time.time() - total_start
if times:
avg = sum(times) / len(times)
print(f"\n成功: {len(times)}/{args.samples}")
print(f"总耗时: {total_elapsed:.1f}s")
print(f"平均: {avg:.1f}s")
print(f"最快: {min(times):.1f}s")
print(f"最慢: {max(times):.1f}s")
else:
print(f"\n全部失败 ({args.samples} 次)" + f" | 总耗时: {total_elapsed:.1f}s")
if __name__ == "__main__":
main()