Source code for translator

from itertools import product
from random import randint, uniform
from model.template import PuzzleTemplate
import json, os
import argparse
import yaml

# Custom exception classes for better error handling
[docs] class PuzzleGenerationError(Exception): """Base exception for puzzle generation errors""" pass
[docs] class NoSolutionError(PuzzleGenerationError): """Raised when Z3 solver cannot find any solution to the constraints""" pass
[docs] class TooManySolutionsError(PuzzleGenerationError): """Raised when the number of solutions exceeds the maximum allowed""" pass
[docs] class AnswerAssertionError(PuzzleGenerationError): """Raised when answer assertion validation fails""" pass
[docs] class RandomGenerationError(PuzzleGenerationError): """Raised when random number/option generation fails after multiple attempts""" pass
[docs] def ext(s): """ 在s前后加空格以适应f" " "环境 """ if s == "" or s == None: return "" return ('\\\"' if s[0] == '\"' else s[0]) + s[1:-1] + ('\\\"' if s[-1] == '\"' else s[-1])
symlist = {} use_spec_vars = []
[docs] def init_program(): global symlist symlist = {} exec("from random import randint, uniform\n", symlist) p = """# -*- coding: utf-8 -*- from z3 import * from random import randint, uniform, sample, choice import itertools from utils import * from collections.abc import Iterable from functools import reduce import re import os import sys set_option(timeout=15000) INDEX_PATTERN = re.compile(r'^__(\\d+)__$') # Custom exception classes for better error handling class PuzzleGenerationError(Exception): pass class NoSolutionError(PuzzleGenerationError): pass class TooManySolutionsError(PuzzleGenerationError): pass class AnswerAssertionError(PuzzleGenerationError): pass class RandomGenerationError(PuzzleGenerationError): pass """ return p + "config = {}\n", p + "config = __config__\n"
[docs] def resolve_rand(domain_str: str, isint: bool): domain_data = domain_str[1:-1].split(',') if isint: exec(f"a = randint(int({eval(domain_data[0], symlist)}), int({eval(domain_data[1], symlist)}))", symlist) return symlist["a"] else: exec(f"a = uniform({eval(domain_data[0], symlist)}, {eval(domain_data[1], symlist)})", symlist) return symlist["a"]
[docs] def process_vars(Dict: dict): codegen = "" codeval = "" for var in Dict: codesol = "" if Dict[var].get("formula"): if 'generate_random_list' in Dict[var]["formula"]: codesol += "set_local_context(locals())\n" codesol += f"{var} = {Dict[var]['formula']}\n" else: if Dict[var]['type'] == 'int': domain_data = Dict[var]['domain'][1:-1].split(',') codesol += f"{var} = randint(int({domain_data[0].strip()}), int({domain_data[1].strip()}))\n" elif Dict[var]['type'] == 'float' or Dict[var]['type'] == 'real': domain_data = Dict[var]['domain'][1:-1].split(',') codesol += f"{var} = uniform({domain_data[0].strip()}, {domain_data[1].strip()})\n" elif Dict[var]['type'] in ['bool', 'enum']: if Dict[var]['type'] == 'bool' and Dict[var]['domain'] is None: domain_data = ['True', 'False'] else: domain_data = Dict[var]['domain'][1:-1].split(',') if len(domain_data) == 1: codesol += f"{var} = {domain_data[0].strip()}\n" else: codesol += f"{var} = choice([{', '.join([item.strip() for item in domain_data])}])\n" else: raise Exception(f"Error: Type {Dict[var]['type']} is not supported") codesol += f"config[\"{var}\"] = {var}\n" codegen += codesol codeval += f"{var} = config[\"{var}\"]\n" if var not in use_spec_vars else codesol return codegen + '\n\n', codeval + '\n\n'
[docs] def process_custom_operator(Dict: dict): # res = "" # for func in Dict: # res += f"def {func}:\n" # lines = Dict[func].split('\n') # for line in lines: # res += '\t' + line # return res + '\n\n' def is_valid_path(path_str): try: _ = os.path.normpath(path_str) return True except (TypeError, AttributeError): return False res = "" for var in Dict: if is_valid_path(Dict[var]): if os.path.exists(Dict[var]): _dirname = os.path.dirname(Dict[var]) _basename = os.path.basename(Dict[var]).split('.')[0] res += f"sys.path.append(\"{_dirname}\")\nfrom {_basename} import {var}\n" continue # else: # print(f"Warning: Path '{Dict[var]}' does not exist and will be parsed as a function.") res += f"{var} = {Dict[var]}\n" return res + '\n\n', res + '\n\n'
[docs] def process_defined_symbols(sym: str, template: dict): res = "" if template is None or 'source' not in template: return res, res types = template["type"] type_text = '[' + ', '.join([f"\"{item}\"" for item in types]) + ']' if isinstance(types, list) else f"\"{types}\"" descs = template["desc"] desc_text = '[' + ', '.join([f"\"{ext(item)}\"" for item in descs]) + ']' if isinstance(descs, list) else f"\"{descs}\"" source = template['source'] source_text = '{' + ', '.join([f"\"{s}\": {s}" for s in source]) + '}' attr = template.get('attr') # Use .get() to avoid KeyError attr_text = None if attr is None else '[' + ', '.join([f"\"{item}\"" for item in attr]) + ']' res += f"{sym} = CustomSym(\"{sym}\", {source_text}, {attr_text}, {type_text}, {desc_text})\n" return res + '\n\n', res + '\n\n'
[docs] def process_multiple_derived_symbols(sym: str, templates: str): codegen = "" codeval = "" if templates is None: return codegen, codeval codegen += f""" _total = {templates["total"]} _num_of_templates = {len(templates["templates"])} _pool_domain = generate_random_list_with_total(_num_of_templates, [0, _total], _total, {[template["domain"] if "domain" in template else None for template in templates["templates"]]}) config["{sym}"] = {{"pool_domain": _pool_domain, "pool": []}} _index = 0 {sym} = [] _desc = [] """ codeval += f""" _pool_domain = config["{sym}"]["pool_domain"] _index = 0 {sym} = [] _desc = [] """ for i, template in enumerate(templates["templates"]): codegen += f""" # Generate Symbols for Template #{i} _domain = _pool_domain[{i}] _domain_cond = {template['domain_cond']} _dim = {template['dim']} _dim_cond = {template['dim_cond']} _custom_cond = {template['custom_cond']} _pool_len = [{', '.join('len(' + str(i) + ')' for i in template['source'])}] _pool_amount = [{', '.join(str(i) for i in template['amount']) if template["amount"] else "1 for _ in range(len(_pool_len))"}] try: _pool_indices, _pool_indices_str = generate_random_indices(_pool_len, _pool_amount, _domain, domain_cond=_domain_cond, dim=_dim, dim_cond=_dim_cond, custom_cond=_custom_cond, order={template['order']}, duplicate={template['duplicate']}, env=globals()) except Exception as e: raise RandomGenerationError(f"Failed to generate random indices for multiple derived symbols template #{i}: {{str(e)}}. This typically happens when the constraints are too restrictive or conflicting, making it impossible to generate valid random samples. Please retry, use '-c' for continuous mode, or review and revise the domain conditions, dimensions, and custom conditions for this template.") config["{sym}"]["pool"].append(_pool_indices_str) _pool_sources = [{', '.join(template['source'])}] """ if template['dim'] == 1: codegen += f""" _pool_vals = [[[list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ] for _ind, _sym in zip(_pool_indices, _pool_vals): {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} {'_ind = [item[0] for item in _ind]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" {sym}.append(_s) _desc.append(_d) _index += 1 store_sym_config(_s, {{"desc": _d, "data": _sym}}) """ codeval += f""" # Generate Symbols for Template #{i} _domain = _pool_domain[{i}] _pool_indices = config["{sym}"]["pool"][{i}] _pool_sources = [{', '.join(template['source'])}] _pool_vals = [[[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ] for _ind, _sym in zip(_pool_indices, _pool_vals): {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} {'_ind = [item[0] for item in _ind]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" {sym}.append(_s) _desc.append(_d) _index += 1 store_sym_config(_s, {{"desc": _d, "data": _sym}}) """ else: codegen += f""" _pool_vals = [[[[list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, _idx_dim) ] for _idx_dim in _idx_domain] for _idx_domain in _pool_indices ] for _pool_vals_domain in _pool_vals: _ss = [] _dd = [] for _sym in _pool_vals_domain: {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" _ss.append(_s) _dd.append(_d) store_sym_config(_s, {{"desc": _d, "data": _sym}}) {sym}.append(_ss) _desc.append(_dd) _index += 1 """ codeval += f""" # Generate Symbols for Template #{i} _domain = _pool_domain[{i}] _pool_indices = config["{sym}"]["pool"]["{i}"] _pool_sources = [{', '.join(template['source'])}] _pool_vals = [[[[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, _idx_dim) ] for _idx_dim in _idx_domain ] for _idx_domain in _pool_indices ] for _index, _pool_vals_domain in enumerate(_pool_vals): _ss = [] _dd = [] for _sym in _pool_vals_domain: {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" _ss.append(_s) _dd.append(_d) store_sym_config(_s, {{"desc": _d, "data": _sym}}) {sym}.append(_ss) _desc.append(_dd) _index += 1 """ codegen += f""" store_sym_config({sym}, {{"desc": _desc, "data": _pool_vals}}) """ codeval += f""" store_sym_config({sym}, {{"desc": _desc, "data": _pool_vals}}) """ return codegen, codeval
[docs] def process_single_derived_symbols(sym: str, template: str): codegen = "" codeval = "" if template is None: return codegen, codeval # 随机化参数 codegen += f""" _domain = {template['domain']} _domain_cond = {template['domain_cond']} _dim = {template['dim']} _dim_cond = {template['dim_cond']} _custom_cond = {template['custom_cond']} _pool_len = [{', '.join('len(' + str(i) + ')' for i in template['source'])}] """ if template['amount']: codegen += f"_pool_amount = [{', '.join(str(i) for i in template['amount'])}]\n" else: codegen += f"_pool_amount = [1 for _ in range(len(_pool_len))]\n" codegen += f""" try: _pool_indices, _pool_indices_str = generate_random_indices(_pool_len, _pool_amount, _domain, domain_cond=_domain_cond, dim=_dim, dim_cond=_dim_cond, custom_cond=_custom_cond, order={template['order']}, duplicate={template['duplicate']}, env=globals()) except Exception as e: raise RandomGenerationError(f"Failed to generate random indices for single derived symbol '{sym}': {{str(e)}}. This typically happens when the constraints are too restrictive or conflicting, making it impossible to generate valid random samples. Please retry, use '-c' for continuous mode, or review and revise the domain conditions, dimensions, and custom conditions.") config["{sym}"] = {{"pool": _pool_indices_str}} _pool_sources = [{', '.join(template['source'])}] {sym} = [] _desc = [] """ if template['dim'] == 1: codegen += f""" _pool_vals = [[[list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ] for _index, _sym in enumerate(_pool_vals): {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" {sym}.append(_s) _desc.append(_d) store_sym_config({sym}, {{\"desc\": _desc, \"data\": _pool_vals}}) for _index, _sym in enumerate({sym}): store_sym_config(_sym, {{\"desc\": _desc[_index], \"data\": _pool_vals[_index]}}) """ codeval += f""" _domain = {template['domain']} _pool_indices = config[\"{sym}\"][\"pool\"] _pool_sources = [{', '.join(template['source'])}] _pool_vals = [[[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ] {sym} = [] _desc = [] for _index, _sym in enumerate(_pool_vals): {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" {sym}.append(_s) _desc.append(_d) store_sym_config({sym}, {{\"desc\": _desc, \"data\": _pool_vals}}) for _index, _sym in enumerate({sym}): store_sym_config(_sym, {{\"desc\": _desc[_index], \"data\": _pool_vals[_index]}}) """ else: codegen += f""" _pool_vals = [[[[list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, _idx_dim) ] for _idx_dim in _idx_domain] for _idx_domain in _pool_indices ] for _index, _pool_vals_domain in enumerate(_pool_vals): _ss = [] _dd = [] for _sym in _pool_vals_domain: {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" _ss.append(_s) _dd.append(_d) {sym}.append(_ss) _desc.append(_dd) store_sym_config({sym}, {{\"desc\": _desc, \"data\": _pool_vals}}) for _idx_domain, _sym_d in enumerate({sym}): for _idx_dim, _sym in enumerate(_sym_d): store_sym_config(_sym, {{\"desc\": _desc[_idx_domain][_idx_dim], \"data\": _pool_vals[_idx_domain][_idx_dim]}}) """ codeval += f""" _domain = {template['domain']} _pool_indices = config[\"{sym}\"][\"pool\"] _pool_sources = [{', '.join(template['source'])}] {sym} = [] _desc = [] _pool_vals = [[[[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, _idx_dim) ] for _idx_dim in _idx_domain ] for _idx_domain in _pool_indices ] for _index, _pool_vals_domain in enumerate(_pool_vals): _ss = [] _dd = [] for _sym in _pool_vals_domain: {'_sym = [item[0] for item in _sym]' if not template['amount'] else ''} set_local_context(locals()) _s = {template['formula']} _d = f\"\"\"{ext(template['desc'])}\"\"\" _ss.append(_s) _dd.append(_d) {sym}.append(_ss) _desc.append(_dd) store_sym_config({sym}, {{\"desc\": _desc, \"data\": _pool_vals}}) for _idx_domain, _sym_d in enumerate({sym}): for _idx_dim, _sym in enumerate(_sym_d): store_sym_config(_sym, {{\"desc\": _desc[_idx_domain][_idx_dim], \"data\": _pool_vals[_idx_domain][_idx_dim]}}) """ return codegen + '\n\n', codeval + '\n\n'
[docs] def process_symbols(Dict: dict): codegen = "" codeval = "" for item in Dict: if "type" in Dict[item]: # DefinedSymbol codesol = process_defined_symbols(item, Dict[item]) codegen += codesol[0] codeval += codesol[1] elif "total" in Dict[item] and "templates" in Dict[item]: # DerivedSymbols codesol = process_multiple_derived_symbols(item, Dict[item]) codegen += codesol[0] codeval += codesol[1] else: # DerivedSymbol codesol = process_single_derived_symbols(item, Dict[item]) codegen += codesol[0] codeval += codesol[1] return codegen + '\n\n', codeval + '\n\n'
[docs] def process_static_conditions(sym: str, template: dict): res = "" if template['desc']: res += f"{sym} = f\"{ext(template['desc'])}\"\n" else: res += f"{sym} = ''\n" res += f"conditions += {sym}\n" res += f"_conditions.append({template['formula']})\n" return res + '\n\n', res + '\n\n'
[docs] def process_dynamic_conditions(sym: str, template: dict): codegen = "" codeval = "" if template['source'] is None: codesol = f"""{sym} = CustomCond(desc = f\"{ext(template['desc'])}\") _conditions.append({template['formula']}) conditions += ({sym}.desc if {sym}.desc else '') """ codegen += codesol codeval += codesol return codegen + '\n\n', codeval + '\n\n' # 生成随机domain if template['domain']: codegen += f"_num = randint({template['domain'][1:-1]})\n" else: codegen += f"_num = 1\n" codegen += f"config[\"{sym}\"] = {{\"domain\": _num}}\n" codeval += f"_num = config[\"{sym}\"][\"domain\"]\n" # 生成随机下标 codegen += f"_pool_len = [{', '.join('len(' + str(i) + ')' for i in template['source'])}]\n" if template['amount']: codegen += f"_pool_amount = [{', '.join(str(i) for i in template['amount'])}]\n" else: codegen += f"_pool_amount = [1 for _ in range(len(_pool_len))]\n" codegen += f""" try: _pool_indices, _pool_indices_str = generate_random_indices(_pool_len, _pool_amount, _num, domain_cond={template['domain_cond']}, custom_cond={template['custom_cond']}, env=globals()) except Exception as e: raise RandomGenerationError(f"Failed to generate random indices for dynamic condition '{sym}': {{str(e)}}. This typically happens when the constraints are too restrictive or conflicting, making it impossible to generate valid random samples. Please retry, use '-c' for continuous mode, or review and revise the domain conditions and custom conditions.") config["{sym}"]["pool"] = _pool_indices_str """ codeval += f"_pool_indices = config[\"{sym}\"][\"pool\"]\n" codegen += f"_pool_sources = [{', '.join(template['source'])}]\n" codeval += f"_pool_sources = [{', '.join(template['source'])}]\n" codegen += "_pool_vals = [[[list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ]\n" codeval += "_pool_vals = [[[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in idx_tuple] for src, idx_tuple in zip(_pool_sources, item) ] for item in _pool_indices ]\n" codegen += f"{sym} = CustomCond(domain=_num, data=_pool_vals)\n" codeval += f"{sym} = CustomCond(domain=_num, data=_pool_vals)\n" codegen += f"""for _index, _sym in enumerate(_pool_vals):\n {'_sym = [item[0] for item in _sym]' if template['amount'] is None else ''} {'_ind = [item[0] for item in _pool_indices[_index]]' if template['amount'] is None else '_ind = _pool_indices[_index]'} _conditions.append({template['formula']}) {sym}.desc += f\"{ext(template['desc'])}\" conditions += ({sym}.desc if {sym}.desc else '') """ codeval += f"""for _index, _sym in enumerate(_pool_vals):\n {'_sym = [item[0] for item in _sym]' if template['amount'] is None else ''} {'_ind = [item[0] for item in _pool_indices[_index]]' if template['amount'] is None else '_ind = _pool_indices[_index]'} {'_ind = [int(m.group(1)) if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in _ind]' if template['amount'] is None else '_ind = [[int(m.group(1)) if (m := INDEX_PATTERN.match(str(idx))) else idx for idx in item] for item in _ind]'} _conditions.append({template['formula']}) {sym}.desc += f\"{ext(template['desc'])}\" conditions += ({sym}.desc if {sym}.desc else '') """ return codegen + '\n\n', codeval + '\n\n'
[docs] def process_conditions(Dict: dict): codegen = "" codeval = "" codesol = """ _conditions = [] conditions = "" """ codegen += codesol codeval += codesol for item in Dict: if "source" in Dict[item]: # DynamicCondition codesol = process_dynamic_conditions(item, Dict[item]) codegen += codesol[0] codeval += codesol[1] else: # StaticCondition codesol = process_static_conditions(item, Dict[item]) codegen += codesol[0] codeval += codesol[1] return codegen + '\n\n', codeval + '\n\n'
[docs] def process_post_generation(Dict: dict, calc_solution=True): codegen = "" codeval = "" # 随机选取一个解,作为生成问题的基准 codesol = """_solver = Solver() for cond in _conditions: _solver.add(cond) _solver_size = len(_solver.assertions()) _solutions = [] _solver.set('random_seed', 50) cnt = 0 while _solver.check() == sat and cnt < 10: # 只取前10组解 model = _solver.model() # 获取当前解 _solutions.append(model) # 保存解 cnt += 1 # 添加排除条件:排除当前解的所有变量赋值 block = [] for var in model: block.append(var() != model[var]) # 对每个变量添加反向约束 _solver.add(Or(block)) # 要求至少有一个变量与当前解不同 sort_solutions(_solutions) """ codegen += codesol + """_sol_id = 0 config["_sol_id"] = _sol_id _sol = _solutions[_sol_id] """ codeval += codesol + """_sol_id = config["_sol_id"] _sol = _solutions[_sol_id] """ # 对于post_gen_vars中的每个符号,代入目标解中的值 codegen += "_post_gen_conditions = []\n" codeval += "_post_gen_conditions = []\n" codegen += "post_gen_conditions = \"\"\n" codeval += "post_gen_conditions = \"\"\n" if Dict["post_gen_vars"]: for sym, val in Dict["post_gen_vars"].items(): codegen += f""" {sym} = {val} config["{sym}"] = {sym} """ codeval += f""" {sym} = config["{sym}"] """ if sym not in use_spec_vars else f""" {sym} = {val} config["{sym}"] = {sym} """ # 对于post_gen_condition中的每个条件,代入新问题 if calc_solution: if Dict["post_gen_conditions"]: codesol2 = "" for item in Dict["post_gen_conditions"]: codesol2 += f""" {item} = f\"{ext(Dict["post_gen_conditions"][item]['desc'])}\" _post_gen_conditions.append({Dict["post_gen_conditions"][item]['formula']}) if {item} != '': post_gen_conditions += {item} + ';' """ codegen += codesol2 codeval += codesol2 codesol3 = """ _solver.reset() for cond in _conditions: _solver.add(cond) for cond in _post_gen_conditions: _solver.add(cond) _solver_size = len(_solver.assertions()) _solutions = [] while _solver.check() == sat: model = _solver.model() # 获取当前解 _solutions.append(model) # 保存解 # 添加排除条件:排除当前解的所有变量赋值 block = [] for var in model: block.append(var() != model[var]) # 对每个变量添加反向约束 _solver.add(Or(block)) # 要求至少有一个变量与当前解不同 """ codegen += codesol3 codeval += codesol3 else: codesol3 = "_solver_size = 0\n" codegen += codesol3 codeval += codesol3 return codegen, codeval
[docs] def process_optimize(Dict: dict): codegen = "" codeval = "" optimize_type = Dict["type"] optimize_formula = Dict["formula"] codesol = f"""_solver = Optimize() for cond in _conditions: _solver.add(cond) _optimize_type = f\"{optimize_type}\" _optimize_formula = {optimize_formula} if _optimize_type == 'minimize': _solver.minimize(_optimize_formula) else: _solver.maximize(_optimize_formula) _solver_size = len(_solver.assertions()) _solutions = [] _value = None # print(_solver) try: if _solver.check() == sat: model = _solver.model() # 获取当前解 _solutions.append(model) # 保存解 _value = model.evaluate(_optimize_formula) # 添加排除条件:排除当前解的所有变量赋值 block = [] for var in model: block.append(var() != model[var]) # 对每个变量添加反向约束 _solver.add(Or(block)) # 要求至少有一个变量与当前解不同 _solver.add((_optimize_formula) == _value) if _solver.check() == sat: raise RandomGenerationError(f"The optimization problem has multiple optimal solutions with value {{_value}}. This makes the puzzle ambiguous. Please retry, use '-c' for continuous mode, or add additional constraints to ensure a unique optimal solution.") else: raise NoSolutionError(f"Z3 optimizer could not find any solution that satisfies the constraints for {{_optimize_type}} optimization of {{_optimize_formula}}. Please retry, use '-c' for continuous mode, or review your constraints and optimization objective.") except Exception as e: if isinstance(e, (NoSolutionError, RandomGenerationError)): raise e else: raise RandomGenerationError(f"Z3 optimizer encountered an error: {{str(e)}}. This might be due to timeout, resource constraints, or complex optimization objectives. Please retry, use '-c' for continuous mode, or try again or simplify the problem.") assert(len(_solutions) == 1) _solutions = _solutions[0] # print(_solutions, _value) """ codegen += codesol codeval += codesol codegen += "print(\"Number of solutions:\", len(_solutions)) \n" return codegen, codeval
[docs] def process_query(Dict: dict): codegen = "" codeval = "" codesol = """queries = '' ans = [] """ codegen += codesol codegen += "config['_queries'] = {}\n" codeval += codesol cnt = 0 for q in Dict: cnt += 1 if "source" in Dict[q] or "templates" in Dict[q]: # 选择题 codegen += f"{q} = CustomCond(desc=f\"{ext(Dict[q]['desc'])}\" + '\\n')\n" codeval += f"{q} = CustomCond(desc=f\"{ext(Dict[q]['desc'])}\" + '\\n')\n" if "templates" not in Dict[q]: tmpcode = process_single_query(q, Dict[q], cnt) codegen += tmpcode[0] codeval += tmpcode[1] else: tmpcode = process_multiple_query(q, Dict[q], cnt) codegen += tmpcode[0] codeval += tmpcode[1] else: # 问答题 codegen += f""" _ans = {Dict[q]['ans_formula']} try: assert({Dict[q]['ans_assertion']}) except AssertionError as e: raise AnswerAssertionError(f"Answer assertion failed for query '{q}': {{str(e) if str(e) else 'Assertion condition not met'}}. The generated answer '{{_ans}}' does not satisfy the assertion '{Dict[q]['ans_assertion']}'. This indicates that the constraints or answer formula may be inconsistent. Please retry, use '-c' for continuous mode, or review and revise the query specification.") except Exception as e: raise AnswerAssertionError(f"Answer assertion evaluation failed for query '{q}': {{str(e)}}. There may be an error in the assertion formula '{Dict[q]['ans_assertion']}' or the answer formula '{Dict[q]['ans_formula']}'. Please retry, use '-c' for continuous mode, or check the syntax and logic in your specification.") {q} = CustomCond(desc=f\"{ext(Dict[q]['desc'])}\" + '\\n') queries += \"{cnt}. \" + {q}.desc ans.append(str({Dict[q]['ans_text']})) """ codeval += f""" _ans = {Dict[q]['ans_formula']} try: assert({Dict[q]['ans_assertion']}) except AssertionError as e: raise AnswerAssertionError(f"Answer assertion failed for query '{q}': {{str(e) if str(e) else 'Assertion condition not met'}}. The generated answer '{{_ans}}' does not satisfy the assertion '{Dict[q]['ans_assertion']}'. This indicates that the constraints or answer formula may be inconsistent. Please retry, use '-c' for continuous mode, or review and revise the query specification.") except Exception as e: raise AnswerAssertionError(f"Answer assertion evaluation failed for query '{q}': {{str(e)}}. There may be an error in the assertion formula '{Dict[q]['ans_assertion']}' or the answer formula '{Dict[q]['ans_formula']}'. Please retry, use '-c' for continuous mode, or check the syntax and logic in your specification.") {q} = CustomCond(desc=f\"{ext(Dict[q]['desc'])}\" + '\\n') queries += \"{cnt}. \" + {q}.desc ans.append(str({Dict[q]['ans_text']})) """ codegen += "ans = '===='.join([str(v) for v in ans])\n" codeval += "ans = '===='.join([str(v) for v in ans])\n" return codegen + '\n\n', codeval + '\n\n'
[docs] def process_single_query(q: str, qconfig: str, cnt: int): codegen = "" codeval = "" # 生成随机数池 codegen += f""" _source = [{', '.join(qconfig['source'])}] _amount = [{', '.join(qconfig['amount']) if qconfig['amount'] else ', '.join(['1' for i in range(len(qconfig['source']))])}] _correct_num = {1 if qconfig['select_type'] else qconfig['opt_num'] - 1} _incorrect_num = {1 if not qconfig['select_type'] else qconfig['opt_num'] - 1} try: _satisfied_configs, _unsatisfied_configs, _satisfied, _unsatisfied = find_required_event_groups(_source, _correct_num, _incorrect_num, _solutions, "{qconfig['opt_formula']}", cond = "{qconfig['cond']}", event_num = _amount, order = {qconfig['order']}, duplicate = {qconfig['duplicate']}, custom_cond = {qconfig['custom_cond']}, env = globals()) except Exception as e: raise RandomGenerationError(f"Failed to generate options for query '{q}': {{str(e)}}. This typically happens when it's impossible to find enough valid/invalid options that satisfy the constraints. Please retry, use '-c' for continuous mode, or review and revise the query conditions, source data, or reduce the number of required options.") """ # 生成选项 if qconfig['select_type'] == True: # 问的是正确选项:1个正确项 + (opt_num - 1)个错误项 codegen += f""" opt_num = {qconfig['opt_num']} _ans_index = randint(0, opt_num - 1) _opts = _unsatisfied[:_ans_index] + _satisfied + _unsatisfied[_ans_index:] _configs = _unsatisfied_configs[:_ans_index] + _satisfied_configs + _unsatisfied_configs[_ans_index:] config["_queries"]["{q}"] = {{"pool": _configs}} """ else: # 问的是错误选项:1个错误项 + (opt_num - 1)个正确项 codegen += f""" opt_num = {qconfig['opt_num']} _ans_index = randint(0, opt_num - 1) _opts = _satisfied[:_ans_index] + _unsatisfied + _satisfied[_ans_index:] _configs = _satisfied_configs[:_ans_index] + _unsatisfied_configs + _satisfied_configs[_ans_index:] config["_queries"]["{q}"] = {{"pool": _configs}} """ codeval += f""" _source = [{', '.join(qconfig['source'])}] _config_opts = config["_queries"][\"{q}\"]["pool"] _opts = [] for _opt in _config_opts: _opts.append([[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip (_source, _opt)]) _ans_index = "" for _ind, _opt in enumerate(_opts): if is_option_valid(_opt, \"{qconfig['opt_formula']}\", \"{qconfig['cond']}\", {qconfig['select_type']}, globals()): _ans_index += chr(_ind + 65) """ # 生成选项文本 codegen += f""" _opt_text = [] for _index, _opt in enumerate(_opts): _opt_text.append(chr(_index + 65) + '. ' + f\"{qconfig['opt_text']}\") {q}.desc += '\\n'.join(_opt_text) + '\\n' """ codeval += f""" _opt_text = [] for _index, _opt in enumerate(_opts): _opt_text.append(chr(_index + 65) + '. ' + f\"{qconfig['opt_text']}\") {q}.desc += '\\n'.join(_opt_text) + '\\n' """ codegen += f"queries += \"{cnt}. \" + {q}.desc + '\\n'\n" codeval += f"queries += \"{cnt}. \" + {q}.desc + '\\n'\n" # 生成答案 codegen += f"ans.append(chr(_ans_index + 65))\n" codeval += f"ans.append(_ans_index)\n" return codegen, codeval
[docs] def process_multiple_query(q: str, qconfig: str, cnt: int): codegen = "" codeval = "" codegen += f""" opt_num = {qconfig["opt_num"]} _num_of_templates = {len(qconfig["templates"])} _pool_domain = generate_random_list_with_total(_num_of_templates, [0, opt_num], opt_num, {[eval(template["domain"]) if "domain" in template else None for template in qconfig["templates"]]}) config["_queries"]["{q}"] = {{"pool_domain": _pool_domain, "pool": []}} """ if qconfig["select_type"]: # 问的是正确选项:1个正确项 + (opt_num - 1)个错误项 codegen += f""" _pool_incorrect_num = generate_random_list_with_total(_num_of_templates, [0, opt_num - 1], opt_num - 1, [[0, v] for v in _pool_domain]) _pool_correct_num = [x - y for x, y in zip(_pool_domain, _pool_incorrect_num)] config["_queries"]["{q}"] = {{"pool_domain": _pool_domain, "pool_correct_num": _pool_correct_num, "pool_incorrect_num": _pool_incorrect_num, "pool": []}} """ else: codegen += f""" _pool_correct_num = generate_random_list_with_total(_num_of_templates, [0, opt_num - 1], opt_num - 1, [[0, v] for v in _pool_domain]) _pool_incorrect_num = [x - y for x, y in zip(_pool_domain, _pool_correct_num)] config["_queries"]["{q}"] = {{"pool_domain": _pool_domain, "pool_correct_num": _pool_correct_num, "pool_incorrect_num": _pool_incorrect_num, "pool": []}} """ codeval += f""" _pool_domain = config["_queries"]["{q}"]["pool_domain"] _pool_correct_num = config["_queries"]["{q}"]["pool_correct_num"] _pool_incorrect_num = config["_queries"]["{q}"]["pool_incorrect_num"] """ codegen += f""" _pool_satisfied_configs = [] _pool_unsatisfied_configs = [] _pool_satisfied = [] _pool_unsatisfied = [] """ for i, template in enumerate(qconfig["templates"]): assert('dim' not in template or template['dim'] == 1) # 生成随机数池 codegen += f""" # Generate Options for Template #{i} _domain = _pool_domain[{i}] _source = [{', '.join(template['source'])}] _amount = [{', '.join(template['amount']) if template['amount'] else ', '.join(['1' for i in range(len(template['source']))])}] _correct_num = _pool_correct_num[{i}] _incorrect_num = _pool_incorrect_num[{i}] try: _satisfied_configs, _unsatisfied_configs, _satisfied, _unsatisfied = find_required_event_groups(_source, _correct_num, _incorrect_num, _solutions, "{template['opt_formula']}", cond = "{template['cond']}", event_num = _amount, order = {template['order']}, duplicate = {template['duplicate']}, custom_cond = {template['custom_cond']}, env = globals()) except Exception as e: raise RandomGenerationError(f"Failed to generate options for multiple query '{q}' template #{i}: {{str(e)}}. This typically happens when it's impossible to find enough valid/invalid options that satisfy the constraints for this template. Please retry, use '-c' for continuous mode, or review and revise the query conditions, source data, or reduce the number of required options.") _pool_satisfied_configs.extend(map(lambda item: {{"template_id": {i}, "config": item}}, _satisfied_configs)) _pool_unsatisfied_configs.extend(map(lambda item: {{"template_id": {i}, "config": item}}, _unsatisfied_configs)) _pool_satisfied.extend(_satisfied) _pool_unsatisfied.extend(_unsatisfied) """ # 生成选项 if qconfig['select_type'] == True: # 问的是正确选项:1个正确项 + (opt_num - 1)个错误项 codegen += f""" _ans_index = randint(0, opt_num - 1) _opts = _pool_unsatisfied[:_ans_index] + _pool_satisfied + _pool_unsatisfied[_ans_index:] _configs = _pool_unsatisfied_configs[:_ans_index] + _pool_satisfied_configs + _pool_unsatisfied_configs[_ans_index:] config["_queries"]["{q}"]["pool"] = _configs """ else: # 问的是错误选项:1个错误项 + (opt_num - 1)个正确项 codegen += f""" _ans_index = randint(0, opt_num - 1) _opts = _pool_satisfied[:_ans_index] + _pool_unsatisfied + _pool_satisfied[_ans_index:] _configs = _pool_satisfied_configs[:_ans_index] + _pool_unsatisfied_configs + _pool_satisfied_configs[_ans_index:] config["_queries"]["{q}"]["pool"] = _configs """ codegen += f""" _opt_text = [] _opt_texts = {[template["opt_text"] for template in qconfig["templates"]]} for _index, _opt in enumerate(_opts): _opt_template_id = _configs[_index]["template_id"] _opt_text_template = "f\\\"" + _opt_texts[_opt_template_id] + "\\\"" _opt_text.append(chr(_index + 65) + '. ' + eval(_opt_text_template)) {q}.desc += '\\n'.join(_opt_text) + '\\n' queries += \"{cnt}. \" + {q}.desc + '\\n' ans.append(chr(_ans_index + 65)) """ _sources = [] for template in qconfig["templates"]: _sources.append('[' + ','.join(template["source"]) + ']') codeval += f""" _config_opts = config["_queries"][\"{q}\"]["pool"] _opts = [] _sources = [{', '.join(_sources)}] for _opt in _config_opts: _opt_template_id = _opt["template_id"] _opt_config = _opt["config"] _opts.append([[list(src)[int(m.group(1))] if (m := INDEX_PATTERN.match(str(idx))) else list(src)[idx] for idx in idx_tuple] for src, idx_tuple in zip(_sources[_opt_template_id], _opt_config)]) _opt_text = [] _opt_texts = {[template["opt_text"] for template in qconfig["templates"]]} for _index, (_opt_config, _opt) in enumerate(zip(_config_opts, _opts)): _opt_template_id = _opt_config["template_id"] _opt_text_template = "f\\"" + _opt_texts[_opt_template_id] + "\\"" _opt_text.append(chr(_index + 65) + '. ' + eval(_opt_text_template)) {q}.desc += '\\n'.join(_opt_text) + '\\n' _ans_index = "" for _ind, (_opt_config, _opt) in enumerate(zip(_config_opts, _opts)): _opt_template_id = _opt_config["template_id"] _opt_formulas = {[template["opt_formula"] for template in qconfig["templates"]]} _conds = {[template["cond"] for template in qconfig["templates"]]} if is_option_valid(_opt, _opt_formulas[_opt_template_id], _conds[_opt_template_id], {qconfig["select_type"]}, globals()): _ans_index += chr(_ind + 65) queries += \"{cnt}. \" + {q}.desc + '\\n' ans.append(_ans_index) """ return codegen, codeval
[docs] def translator(spec: dict): codegen, codeval = init_program() # the result program # process custom operator if spec.get('custom_operator'): res = process_custom_operator(spec['custom_operator']) codegen += res[0] codeval += res[1] # process variables if spec.get('variables'): res = process_vars(spec['variables']) codegen += res[0] codeval += res[1] if spec.get('symbols'): res = process_symbols(spec['symbols']) codegen += res[0] codeval += res[1] # Initialize _conditions even if no conditions are specified if spec.get('conditions'): res = process_conditions(spec['conditions']) codegen += res[0] codeval += res[1] else: # Initialize empty conditions codesol = """ _conditions = [] conditions = "" """ codegen += codesol codeval += codesol if spec.get('post_generation'): res = process_post_generation(spec['post_generation'], calc_solution=spec['calc_solution']) codegen += res[0] codeval += res[1] elif spec.get('optimize'): res = process_optimize(spec['optimize']) codegen += res[0] codeval += res[1] elif spec.get('calc_solution'): codesol = f"""_solver = Solver() for cond in _conditions: _solver.add(cond) _solver_size = len(_solver.assertions()) _solutions = [] try: while _solver.check() == sat: model = _solver.model() # 获取当前解 _solutions.append(model) # 保存解 if(len(_solutions) > {spec['max_solution']}): raise TooManySolutionsError(f"Found {{len(_solutions)}} solutions, which exceeds the maximum allowed ({spec['max_solution']}). This typically means the constraints are too loose. Please retry, use '-c' for continuous mode, or consider adding more specific constraints to reduce the solution space, or increase the max_solution limit if appropriate.") # 添加排除条件:排除当前解的所有变量赋值 block = [] for var in model: block.append(var() != model[var]) # 对每个变量添加反向约束 _solver.add(Or(block)) # 要求至少有一个变量与当前解不同 except Exception as e: if isinstance(e, TooManySolutionsError): raise e else: raise RandomGenerationError(f"Z3 solver encountered an error during solution generation: {{str(e)}}. This might be due to timeout or resource constraints. Please retry, use '-c' for continuous mode, or try again or simplify the constraints.") if len(_solutions) == 0: # Get more detailed information about why there's no solution _unsat_core = _solver.unsat_core() if hasattr(_solver, 'unsat_core') else [] _core_info = f" Unsatisfiable core: {{[str(c) for c in _unsat_core]}}" if _unsat_core else "" raise NoSolutionError(f"Z3 solver could not find any solution that satisfies all the given constraints. This means the constraints are contradictory or too restrictive.{{_core_info}} Please retry, use '-c' for continuous mode, or review your puzzle specification and ensure the constraints are consistent and achievable.") """ codegen += codesol codeval += codesol codegen += "print(\"solution number:\", len(_solutions)) \n" # if spec["assert_one_solution"]: # codegen += "assert(len(_solutions) == 1)\n" # codeval += "assert(len(_solutions) == 1)\n" else: codesol = "_solver_size = 0\n" codegen += codesol codeval += codesol # process queries if spec.get('queries'): res = process_query(spec['queries']) codegen += res[0] codeval += res[1] codesol = f"""problem = f\"\"\"{ext(spec['desc'])}\"\"\" print(problem) print('answer: ', ans) """ codegen += codesol codeval += codesol # with open("output.py", "w", encoding='utf-8') as fp: # fp.write(codegen) return codegen, codeval
[docs] def process(puzzle_template, mode): try: codegen, codeval = translator(puzzle_template) # print(code) # for debug if '-t' in mode: with open("output.py", "w", encoding="utf-8") as output_file: output_file.write(codegen) symlist_code = {} # print(config) exec(codegen, symlist_code) config = symlist_code['config'] codeval = codeval.replace("__config__", str(config)) if '-t' in mode: with open("output_val.py", "w", encoding="utf-8") as output_file: output_file.write(codeval) with open("config.json", "w", encoding="utf-8") as output_file: output_file.write(json.dumps(config, ensure_ascii=False)) problem = symlist_code["problem"] answer = symlist_code["ans"] sym_num = 0 symbols = puzzle_template.get("symbols", {}) if symbols: defined_symbols = {k: v for k, v in symbols.items() if 'type' in v} for var in defined_symbols: sym_num += len(symlist_code[var]) sym_type = list(set( item for v in defined_symbols.values() for item in (v["type"] if isinstance(v["type"], list) else [v["type"]]) )) else: defined_symbols = {} sym_type = [] cond_num = int(symlist_code["_solver_size"]) return { "problem": problem, "answer": answer, # "solution": codeval, "parameters": { "cond_num": cond_num, "sym_num": sym_num, "sym_type": sym_type, **({"opt_solution": str(symlist_code['_solutions'])} if puzzle_template["optimize"] is not None else {}) }, "config": config } except (NoSolutionError, TooManySolutionsError, AnswerAssertionError, RandomGenerationError) as e: # Re-raise our custom exceptions with their detailed messages raise e except Exception as e: # Catch any other unexpected errors and provide a generic message raise PuzzleGenerationError(f"Unexpected error during puzzle generation: {str(e)}. Please retry, use '-c' for continuous mode, or check your puzzle specification for syntax errors or invalid configurations, and try again.")
[docs] def repeat_process(puzzle_spec_path, output_path, new_puzzles_num=100, mode = "-t"): puzzle_template = None try: with open(puzzle_spec_path, "r", encoding="utf-8") as file: content = file.read() # for yaml files if puzzle_spec_path.endswith(".yaml"): content = yaml.load(content, Loader=yaml.FullLoader) puzzle_template = PuzzleTemplate.model_validate(content) else: puzzle_template = PuzzleTemplate.model_validate_json(content) # puzzle_template_json = puzzle_template.model_dump_json(indent=2) # 转换为 JSON str puzzle_template = puzzle_template.model_dump() # 转换为 JSON Data except Exception as e: print(e) if not puzzle_template: return False with open(output_path, "w", encoding="utf-8", buffering=1) as output_file: for i in range(new_puzzles_num): while True: # Keep retrying until successful try: sample = process(puzzle_template, mode) output_file.write(json.dumps(sample, ensure_ascii=False) + '\n') break # Exit the while loop and move to next puzzle except (NoSolutionError, TooManySolutionsError, AnswerAssertionError, RandomGenerationError) as e: if '-c' not in mode: raise e # In continuous mode, just retry the same puzzle continue except Exception as e: if '-c' not in mode: raise e # In continuous mode, just retry the same puzzle continue return True
import json
[docs] def parse_config_file(config_file): config_list = [] try: with open(config_file, "r", encoding="utf-8") as file: # First try to parse as JSON (single object) try: data = json.load(file) config_list.append(data) return config_list except json.JSONDecodeError: # If not JSON, try as JSONL (each line is a JSON object) pass # Reset file pointer to beginning for JSONL processing file.seek(0) # Process as JSONL for line_number, line in enumerate(file, 1): line = line.strip() if not line: continue try: data = json.loads(line) if "config" not in data: raise ValueError(f"Line {line_number} does not contain 'config' field") config_list.append(data["config"]) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON at line {line_number}: {str(e)}") return config_list except Exception as e: print(f"Error processing config file: {str(e)}") raise
[docs] def process_with_config(puzzle_spec_path, output_path, config_file): puzzle_template = None try: with open(puzzle_spec_path, "r", encoding="utf-8") as file: content = file.read() puzzle_template = PuzzleTemplate.model_validate_json(content) # puzzle_template_json = puzzle_template.model_dump_json(indent=2) # 转换为 JSON str puzzle_template = puzzle_template.model_dump() # 转换为 JSON Data except Exception as e: print(e) if not puzzle_template: return False configs = parse_config_file(config_file) codegen, codeval = translator(puzzle_template) with open(output_path, "w", encoding="utf-8") as output_file: for config_idx, config in enumerate(configs): try: # config_index = codeval.find("__config__") # config_next_line_index = codeval[config_index:].find("\n") + config_index # codeval = codeval[:config_next_line_index] + f'\n__use_spec_vars__ = {use_spec_vars}' + codeval[config_next_line_index:] codeval_new = codeval.replace("__config__", str(config)) # for debug if '-t' in mode: with open("output.py", "w", encoding="utf-8") as debug_file: debug_file.write(codeval_new) symlist_code = {} # print(config) exec(codeval_new, symlist_code) problem = symlist_code["problem"] answer = symlist_code["ans"] config_modified = symlist_code['config'] sym_num = 0 symbols = puzzle_template.get("symbols", {}) if symbols: defined_symbols = {k: v for k, v in symbols.items() if 'type' in v} for var in defined_symbols: sym_num += len(symlist_code[var]) sym_type = list(set( item for v in defined_symbols.values() for item in (v["type"] if isinstance(v["type"], list) else [v["type"]]) )) else: defined_symbols = {} sym_type = [] cond_num = int(symlist_code["_solver_size"]) res = { "problem": problem, "answer": answer, # "solution": codeval, "parameters": { "cond_num": cond_num, "sym_num": sym_num, "sym_type": sym_type, **({"opt_solution": str(symlist_code['_solutions'])} if puzzle_template["optimize"] is not None else {}) }, "config": config_modified } output_file.write(json.dumps(res, ensure_ascii=False) + '\n') except (NoSolutionError, TooManySolutionsError, AnswerAssertionError, RandomGenerationError) as e: print(f"\n⚠️ Config-based puzzle generation failed for config #{config_idx + 1}:") print(f" {type(e).__name__}: {str(e)}") print(" Skipping this configuration...") continue except Exception as e: print(f"\n❌ Unexpected error processing config #{config_idx + 1}: {str(e)}") print(" Skipping this configuration...") continue
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-d', '--deploy', help="Enable deploy mode") parser.add_argument('-t', '--test', help="Enable test mode") parser.add_argument('-n', '--num', help="Enable test mode") parser.add_argument('-c','--continuous', action='store_true', help="Enable continuous search") parser.add_argument('-o', '--output', help="Specify output file name") parser.add_argument('-g', '--config', help="Specify configuration file", required=False) parser.add_argument('-u', '--use-spec-vars', nargs='+', help="Variables whose values depend on the specification during config-based generation", required=False) args = parser.parse_args() mode = [] if args.continuous: mode.append("-c") # 根据参数选择模式 if not args.deploy and not args.test: raise Exception("Either -d or -t should be specified.") elif args.deploy: # Mode 2 puzzle_spec_path = args.deploy new_puzzles_num = int(args.num) if args.num else 1000 output_path = args.output if args.output else os.path.join(os.path.dirname(puzzle_spec_path), "output_1k.jsonl") mode.append("-d") mode.append("-c") else: # Mode 1 puzzle_spec_path = args.test new_puzzles_num = int(args.num) if args.num else 1 output_path = args.output if args.output else os.path.join(os.path.dirname(puzzle_spec_path), "output.jsonl") mode.append("-t") if args.config: use_spec_vars = args.use_spec_vars if args.use_spec_vars else [] process_with_config(puzzle_spec_path, output_path, args.config) else: repeat_process(puzzle_spec_path, output_path, new_puzzles_num, mode=mode)