-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathrun.py
382 lines (317 loc) · 12.3 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
import sys
import argparse
from datetime import datetime
import json
import yaml
import itertools
import re
import asyncio
from tqdm.asyncio import tqdm_asyncio
from dataclasses import dataclass, field
from typing import List, Tuple, Set, Dict, Optional, Any
@dataclass
class OpenPerfTiming:
seconds: int
milliseconds: float
@dataclass
class TestParameters:
"""Represents the parameters configuration for a test"""
parameters: Dict[str, List[str]]
@dataclass
class TestDefinition:
"""Represents a single test configuration"""
timeout: Optional[int]
format: str
parameters: TestParameters
tags: List[str] = field(default_factory=list) # List of tags for filtering
@dataclass
class TestConfiguration:
"""Root configuration class"""
max_workers: Optional[int]
tests: Dict[str, TestDefinition]
@dataclass
class TestResult:
test_name: str
command: str
returncode: int
stderr: str
duration: float
timestamp: str
openperf_timing: Optional[OpenPerfTiming] = None
tags: List[str] = field(default_factory=list)
class TestConfig:
def __init__(self, config_path: str):
"""Load and parse test configuration file."""
with open(config_path, "r") as f:
raw_config = yaml.safe_load(f)
# Convert raw dictionary to dataclass
tests_dict = {}
for test_name, test_data in raw_config.get("tests", {}).items():
test_params = TestParameters(parameters=test_data.get("parameters", {}))
tests_dict[test_name] = TestDefinition(
timeout=test_data.get("timeout"),
format=test_data.get("format", ""),
parameters=test_params,
tags=test_data.get("tags", []), # Test tags
)
self.config = TestConfiguration(
max_workers=raw_config.get("max_workers"),
tests=tests_dict,
)
self.validate_config()
def validate_config(self):
"""Validate the configuration file structure."""
if not self.config.tests:
raise ValueError("Missing required field 'tests' in config")
for test_name, test_config in self.config.tests.items():
if not test_config.parameters.parameters:
raise ValueError(f"Test '{test_name}' must have 'parameters' field")
def filter_tests_by_tags(
self, include_tags: Optional[Set[str]]
) -> Dict[str, TestDefinition]:
"""Filter tests based on tags."""
if not include_tags:
return self.config.tests
filtered_tests = {}
for test_name, test_def in self.config.tests.items():
test_tags = set(test_def.tags)
# Include test if it has any of the included tags (or if no include_tags specified)
if test_tags & include_tags:
filtered_tests[test_name] = test_def
return filtered_tests
def generate_test_cases(
self,
include_tags: Optional[Set[str]] = None,
) -> List[Dict[str, Any]]:
"""Generate all test cases from config, filtered by tags."""
test_cases = []
# Filter tests by tags
filtered_tests = self.filter_tests_by_tags(include_tags)
for test_name, test_config in filtered_tests.items():
# Separate positional and named parameters
positional_params = []
named_params = {}
for param_name, param_values in test_config.parameters.parameters.items():
try:
pos = int(param_name)
while len(positional_params) <= pos:
positional_params.append(None)
positional_params[pos] = param_values
except ValueError:
named_params[param_name] = param_values
positional_combinations = list(
itertools.product(*[p for p in positional_params if p is not None])
)
named_combinations = [
dict(zip(named_params.keys(), values))
for values in itertools.product(*named_params.values())
]
if not named_combinations:
named_combinations = [{}]
for pos_values in positional_combinations:
for named_values in named_combinations:
command = test_config.format.format(*pos_values, **named_values)
param_strs = []
for i, value in enumerate(pos_values):
param_strs.append(f"p{i}-{value}")
for name, value in named_values.items():
param_strs.append(f"{name}-{value}")
case_name = f"{test_name}_{'_'.join(param_strs)}"
case = {
"name": case_name,
"args": command.split(),
"timeout": test_config.timeout,
"tags": test_config.tags,
}
test_cases.append(case)
return test_cases
class TestRunner:
def __init__(self):
self.results: List[TestResult] = []
def _parse_openperf_timing(self, output: str) -> Optional[OpenPerfTiming]:
"""Parse OpenPerf timing from output."""
pattern = r"OpenPerf time:(?:\s+(?P<seconds>\d+)\s+s,)?\s+(?P<milliseconds>[\d.]+)\s+ms"
match = re.search(pattern, output)
if match:
seconds = int(match.group("seconds")) if match.group("seconds") else 0
milliseconds = float(match.group("milliseconds"))
return OpenPerfTiming(seconds=seconds, milliseconds=milliseconds)
return None
async def _run_single_test(
self, test_config: Tuple[str, List[str], Optional[int], List[str]]
) -> TestResult:
"""Run a single test in a separate process asynchronously."""
name, args, timeout, tags = test_config
command = args
start_time = asyncio.get_event_loop().time()
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_byte, stderr_byte = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
except TimeoutError:
process.kill()
return TestResult(
test_name=name,
command=" ".join(command),
returncode=-256,
stderr="Python timeout waiting for subprocess",
duration=asyncio.get_event_loop().time() - start_time,
timestamp=datetime.now().isoformat(),
openperf_timing=None,
tags=tags,
)
stdout = stdout_byte.decode()
stderr = stderr_byte.decode()
openperf_timing = self._parse_openperf_timing(stdout + stderr)
returncode = process.returncode if process.returncode is not None else -256
return TestResult(
test_name=name,
command=" ".join(command),
returncode=returncode,
stderr=stderr,
duration=asyncio.get_event_loop().time() - start_time,
timestamp=datetime.now().isoformat(),
openperf_timing=openperf_timing,
tags=tags,
)
except Exception as e:
return TestResult(
test_name=name,
command=" ".join(command),
returncode=-1,
stderr=f"Test failed with error: {str(e)}",
duration=0.0,
timestamp=datetime.now().isoformat(),
openperf_timing=None,
tags=tags,
)
async def run_tests(
self,
tests: List[Tuple[str, List[str], Optional[int], List[str]]],
max_concurrent: Optional[int],
) -> List[TestResult]:
"""Run multiple tests concurrently using asyncio.
Args:
tests: List of tuples containing (test_name, args_list, timeout, tags)
max_concurrent: Maximum number of concurrent test processes
Returns:
List of TestResult objects
"""
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(max_concurrent if max_concurrent else 8)
async def run_with_semaphore(test_config):
async with semaphore:
return await self._run_single_test(test_config)
# Create tasks for all tests
tasks = [run_with_semaphore(test_config) for test_config in tests]
# Run tests with progress bar
self.results = await tqdm_asyncio.gather(*tasks, desc="Running tests")
return self.results
def export_results(self, output_file: str):
"""Export test results to JSON file."""
results_dict = [
{
"test_name": r.test_name,
"command": r.command,
"returncode": r.returncode,
"stderr": r.stderr,
"duration": r.duration,
"timestamp": r.timestamp,
"openperf_timing": (
{
"seconds": r.openperf_timing.seconds,
"milliseconds": r.openperf_timing.milliseconds,
"total_seconds": r.openperf_timing.seconds
+ (r.openperf_timing.milliseconds / 1000),
}
if r.openperf_timing
else None
),
}
for r in self.results
]
with open(output_file, "w") as f:
json.dump(results_dict, f, indent=2)
def format_openperf_time(timing: Optional[OpenPerfTiming]) -> str:
"""Format OpenPerf timing for display."""
if timing:
if timing.seconds != 0:
return f"{timing.seconds} s, {timing.milliseconds:.3f} ms"
else:
return f"{timing.milliseconds:.3f} ms"
else:
return "N/A"
def run_from_config(
config_path: str,
output_file: Optional[str] = None,
include_tags: Optional[Set[str]] = None,
) -> int:
"""Run all tests specified in a config file."""
config = TestConfig(config_path)
runner = TestRunner()
test_cases = config.generate_test_cases(include_tags)
results = asyncio.run(
runner.run_tests(
list(
(t["name"], t["args"], t.get("timeout"), t["tags"]) for t in test_cases
),
max_concurrent=config.config.max_workers,
)
)
print(f"\nRunning tests from config: {config_path}")
if include_tags:
print(f"Including tags: {', '.join(include_tags)}")
print(
"\n{:<30} {:<15} {:<25} {:<10} {:<20}".format(
"Test Name", "Return Code", "OpenPerf Time", "Duration", "Tags"
)
)
print("-" * 100)
failed = 0
for result in results:
print(
"{:<30} {:<15} {:<25} {:.2f}s {:<20}".format(
result.test_name,
result.returncode,
format_openperf_time(result.openperf_timing),
result.duration,
", ".join(result.tags),
)
)
if result.returncode != 0:
failed += 1
print("Error:")
print(result.stderr)
if output_file:
runner.export_results(output_file)
print(f"\nResults exported to {output_file}")
return failed
def main():
parser = argparse.ArgumentParser(description="Test Runner for Binary Files")
parser.add_argument("config", help="Path to YAML config file")
parser.add_argument("--output", help="Output JSON file for results")
parser.add_argument(
"--tags",
help="Only run tests with these tags (comma-separated)",
type=str,
)
args = parser.parse_args()
# Convert tag arguments to sets
include_tags = set(args.tags.split(",")) if args.tags else None
try:
failed = run_from_config(args.config, args.output, include_tags)
if failed == 0:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
print(f"Error: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()