Overcomplicated the test script because of course [CI SKIP]

pull/60/head
Jef Roosens 2022-01-30 18:02:45 +01:00
parent 4c56351132
commit 766e097be8
Signed by: Jef Roosens
GPG Key ID: B580B976584B5F30
2 changed files with 45 additions and 17 deletions

View File

@ -107,7 +107,6 @@ pub fn read_pkg(pkg_path string) ?Pkg {
a := C.archive_read_new()
entry := C.archive_entry_new()
mut r := 0
// Sinds 2020, all newly built Arch packages use zstd
C.archive_read_support_filter_zstd(a)
@ -116,7 +115,7 @@ pub fn read_pkg(pkg_path string) ?Pkg {
C.archive_read_support_format_tar(a)
// TODO find out where does this 10240 come from
r = C.archive_read_open_filename(a, &char(pkg_path.str), 10240)
r := C.archive_read_open_filename(a, &char(pkg_path.str), 10240)
if r != C.ARCHIVE_OK {
return error('Failed to open package.')

59
test.py
View File

@ -6,6 +6,7 @@ import uuid
import argparse
import asyncio
import aiohttp
import sys
# A list of words the program can choose from
@ -16,27 +17,27 @@ WORDS = ["alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf",
SEED = 2022
def random_words(max_len=None, min_len=1):
def random_words(words, min_len, max_len=None):
"""
Returns a random list of words, with a length randomly choosen between
min_len and max_len. If max_len is None, it is equal to the length of
WORDS.
words.
"""
if max_len is None:
max_len = len(WORDS)
max_len = len(words)
k = random.randint(min_len, max_len)
return random.choices(WORDS, k=k)
return random.choices(words, k=k)
def random_lists(n, max_len=None, min_len=1):
return [random_words(max_len, min_len) for _ in range(n)]
def random_lists(words, n, min_len, max_len=None):
return [random_words(words, min_len, max_len) for _ in range(n)]
def create_random_pkginfo():
def create_random_pkginfo(words, name_min_len, name_max_len):
"""
Generates a random .PKGINFO
"""
name = "-".join(random_words(3))
name = "-".join(random_words(words, name_min_len, name_max_len))
ver = "0.1.0" # doesn't matter what it is anyways
# TODO add random dependencies (all types)
@ -50,7 +51,7 @@ def create_random_pkginfo():
return "\n".join(f"{key} = {value}" for key, value in data.items())
def create_random_package(tmpdir):
def create_random_package(tmpdir, words, pkg_name_min_len, pkg_name_max_len, min_files, max_files, min_filename_len, max_filename_len):
"""
Creates a random, but valid Arch package, using the provided tmpdir. Output
is the path to the created package tarball.
@ -69,11 +70,13 @@ def create_random_package(tmpdir):
with tarfile.open(tar_path, "w") as tar:
# Add random .PKGINFO file
pkginfo_file = sub_path / ".PKGINFO"
pkginfo_file.write_text(create_random_pkginfo())
pkginfo_file.write_text(create_random_pkginfo(words, pkg_name_min_len, pkg_name_max_len))
tar.add(pkginfo_file, filter=remove_prefix)
# Create random files
for words in random_lists(10, max_len=5):
file_count = random.randint(min_files, max_files)
for words in random_lists(words, file_count, min_filename_len, max_filename_len):
path = sub_path / 'usr' / ('/'.join(words) + ".txt")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(' '.join(words))
@ -83,14 +86,19 @@ def create_random_package(tmpdir):
return tar_path
async def upload_random_package(dir, sem):
tar_path = create_random_package(dir)
async def check_output(r):
good = {"File already exists.", "Package added successfully."}
txt = await r.text()
return (txt in good, txt)
async def upload_random_package(tar_path, sem):
async with sem:
with open(tar_path, 'rb') as f:
async with aiohttp.ClientSession() as s:
async with s.post("http://localhost:8000/publish", data=f.read(), headers={"x-api-key": "test"}) as r:
print(r.text)
return await check_output(r)
async def main():
@ -99,6 +107,14 @@ async def main():
parser.add_argument("count", help="How many packages to upload.", default=1, type=int)
parser.add_argument("-p", "--parallel", help="How many uploads to run in parallel.", default=1, type=int)
parser.add_argument("-s", "--seed", help="Seed for the randomizer.", default=SEED, type=int)
parser.add_argument("--min-files", help="Minimum amount of files to add to an archive.", default=5, type=int)
parser.add_argument("--max-files", help="Max amount of files to add to an archive.", default=10, type=int)
parser.add_argument("--min-filename-length", help="Minimum amount of words to use for generating filenames.", default=1, type=int)
parser.add_argument("--max-filename-length", help="Max amount of words to use for generating filenames.", default=5, type=int)
parser.add_argument("--min-pkg-name-length", help="Minimum amount of words to use for creating package name.", default=1, type=int)
parser.add_argument("--max-pkg-name-length", help="Max amount of words to use for creating package name.", default=3, type=int)
parser.add_argument("--words", help="Words to use for randomizing.", default=WORDS, type=lambda s: s.split(','))
# parser.add_argument("--words", help="Words to use for randomizing.", default=WORDS, type=)
# parser.add_argument("-d", "--dir", help="Directory to create ")
args = parser.parse_args()
@ -106,10 +122,23 @@ async def main():
sem = asyncio.BoundedSemaphore(args.parallel)
random.seed(args.seed)
with tempfile.TemporaryDirectory() as tmpdirname:
tmpdir = Path(tmpdirname)
await asyncio.gather(*(upload_random_package(tmpdir, sem) for _ in range(args.count)))
# We generate the tars in advance because they're not async anyways
print("Generating tarballs...")
tars = {
create_random_package(tmpdir, args.words, args.min_pkg_name_length, args.max_pkg_name_length, args.min_files, args.max_files, args.min_filename_length, args.max_filename_length)
for _ in range(args.count)
}
print("Sending requests...")
res = await asyncio.gather(*(upload_random_package(tar, sem) for tar in tars))
# Generate status report
if any(not x[0] for x in res):
sys.exit(1)
if __name__ == "__main__":