You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

78 lines
2.4 KiB

#!/usr/bin/env python3
import argparse
import logging
import os
import re
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
def job(command):
"""Takes a list of str, and runs it as a subprocess."""
command_line = " ".join(command)
logging.debug(f"Running '{command_line}'\n")
res = subprocess.run(command, check=True, stderr=subprocess.PIPE,
universal_newlines=True).stderr
logging.debug(f"Command '{command_line} output:\n'{res}'")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the fuzz targets a given amount of times, or generate"
" new seeds.",
)
parser.add_argument(
"seed_dir",
help="The parent directory for the seed corpora of each target.",
)
parser.add_argument("-g", "--generate", action="store_true")
parser.add_argument(
"-j",
"--par",
type=int,
default=1,
help="How many targets to run in parallel.",
)
parser.add_argument(
"-n",
"--runs",
default=100000,
help="How many times to run each target (if generating).",
)
parser.add_argument(
"-m",
"--merge_dir",
default=None,
help="The parent directory to merge each target corpus from.",
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
target_dir = os.path.abspath(os.path.dirname(__file__))
targets = [os.path.join(target_dir, f) for f in os.listdir(target_dir)
if re.compile(r"^fuzz-[\w-]*$").findall(f)]
with ThreadPoolExecutor(max_workers=args.par) as pool:
jobs = []
runs = args.runs if args.generate else 1
for target in targets:
seed_dir = os.path.join(args.seed_dir, os.path.basename(target))
os.makedirs(seed_dir, exist_ok=True)
command = [
target,
f"-runs={runs}" if args.merge_dir is None else "-merge=1",
seed_dir,
]
if args.merge_dir is not None:
input_target = os.path.join(args.merge_dir,
os.path.basename(target))
if not os.path.exists(input_target):
continue
command.append(input_target)
jobs.append(pool.submit(job, command))
for completed in as_completed(jobs):
completed.result()