Skip to content

Commit 6b06016

Browse files
committed
Compile C API source files in parallel.
1 parent 93001ef commit 6b06016

File tree

2 files changed

+152
-10
lines changed

2 files changed

+152
-10
lines changed

graalpython/com.oracle.graal.python.cext/setup.py

Lines changed: 151 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@
4141
import sys
4242
import os
4343
import shutil
44-
import site
4544
import logging
4645
from distutils.core import setup, Extension
4746
from distutils.sysconfig import get_config_var, get_config_vars
48-
49-
logger = logging.getLogger(__name__)
47+
import _sysconfig
5048

5149
__dir__ = __file__.rpartition("/")[0]
5250
cflags_warnings = ["-Wno-int-to-pointer-cast", "-Wno-int-conversion", "-Wno-incompatible-pointer-types-discards-qualifiers", "-Wno-pointer-type-mismatch"]
@@ -58,6 +56,130 @@
5856
so_ext = get_config_var("EXT_SUFFIX")
5957
SOABI = get_config_var("SOABI")
6058

59+
# configure logger
60+
logger = logging.getLogger(__name__)
61+
logging.basicConfig(format='%(message)s', level=logging.DEBUG if sys.flags.verbose else logging.ERROR)
62+
63+
64+
threaded = _sysconfig.get_config_var("WITH_THREAD")
65+
if threaded:
66+
logger.debug("building C API threaded")
67+
import threading
68+
import queue
69+
70+
class SimpleThreadPool:
71+
72+
def __init__(self, n=None):
73+
self.n = n if n else os.cpu_count()
74+
self.running = False
75+
self.started = False
76+
self.finished_semaphore = None
77+
self.task_queue = queue.SimpleQueue()
78+
self.result_queue = queue.SimpleQueue()
79+
80+
def worker_fun(self, id, fun):
81+
while self.running:
82+
try:
83+
item = self.task_queue.get()
84+
if item is not None:
85+
result = fun(item)
86+
self.result_queue.put((id, True, item, result))
87+
else:
88+
break
89+
except BaseException as e:
90+
self.result_queue.put((id, False, item, e))
91+
self.finished_semaphore.release()
92+
93+
def start_thread_pool(self, fun):
94+
if self.running:
95+
raise RuntimeException("pool already running")
96+
97+
logger.debug("Starting thread pool with {} worker threads".format(self.n))
98+
self.running = True
99+
self.workers = [None] * self.n
100+
self.finished_semaphore = threading.Semaphore(0)
101+
for i in range(self.n):
102+
worker = threading.Thread(target=self.worker_fun, args=(i, fun))
103+
worker.daemon = True
104+
worker.start()
105+
self.workers[i] = worker
106+
107+
def stop_thread_pool(self):
108+
self.running = False
109+
# drain queue; remove non-None items
110+
try:
111+
self.task_queue.get_nowait()
112+
except queue.Empty:
113+
pass
114+
115+
# wake up threads by putting None items into the task queue
116+
for i in range(self.n):
117+
self.task_queue.put(None)
118+
119+
for worker in self.workers:
120+
worker.join()
121+
122+
def put_job(self, items):
123+
for item in items:
124+
self.task_queue.put(item)
125+
for i in range(self.n):
126+
self.task_queue.put(None)
127+
128+
def wait_until_finished(self):
129+
for i in range(self.n):
130+
self.finished_semaphore.acquire(True, None)
131+
132+
results = []
133+
try:
134+
while not self.result_queue.empty():
135+
id, success, item, result = self.result_queue.get_nowait()
136+
if not success:
137+
raise result
138+
else:
139+
results.append(result)
140+
except queue.Empty:
141+
# just to be sure
142+
pass
143+
return results
144+
145+
146+
def pcompiler(self, sources, output_dir=None, macros=None, include_dirs=None, debug=0, extra_preargs=None, extra_postargs=None, depends=None):
147+
# taken from distutils.ccompiler.CCompiler
148+
macros, objects, extra_postargs, pp_opts, build = self._setup_compile(output_dir, macros, include_dirs, sources, depends, extra_postargs)
149+
cc_args = self._get_cc_args(pp_opts, debug, extra_preargs)
150+
151+
def _single_compile(obj):
152+
try:
153+
src, ext = build[obj]
154+
except KeyError:
155+
return
156+
logger.debug("Compiling {!s}".format(src))
157+
self._compile(obj, src, ext, cc_args, extra_postargs, pp_opts)
158+
159+
if len(objects) > 1:
160+
logger.debug("Compiling {} objects in parallel.".format(len(objects)))
161+
pool = SimpleThreadPool()
162+
pool.start_thread_pool(_single_compile)
163+
pool.put_job(objects)
164+
pool.wait_until_finished()
165+
pool.stop_thread_pool()
166+
else:
167+
logger.debug("Compiling 1 object without thread pool.")
168+
_single_compile(objects[0])
169+
return objects
170+
171+
172+
def build_extensions(self):
173+
self.check_extensions_list(self.extensions)
174+
if len(self.extensions) > 1:
175+
pool = SimpleThreadPool()
176+
pool.start_thread_pool(self.build_extension)
177+
pool.put_job(self.extensions)
178+
pool.wait_until_finished()
179+
pool.stop_thread_pool()
180+
else:
181+
return self.build_extension(self.extensions[0])
182+
61183

62184
def system(cmd, msg=""):
63185
logger.debug("Running command: " + cmd)
@@ -135,7 +257,8 @@ def build(self, extracted_dir=None):
135257
with open(makefile_path, "w") as f:
136258
f.write(content)
137259

138-
system("make -C '%s' -f '%s' CC='%s'" % (lib_src_folder, self.makefile, get_config_var("CC")), msg="Could not build libbz2")
260+
parallel_arg = "-j" + str(os.cpu_count()) if threaded else ""
261+
system("make -C '%s' %s -f '%s' CC='%s'" % (lib_src_folder, parallel_arg, self.makefile, get_config_var("CC")), msg="Could not build libbz2")
139262
return lib_src_folder
140263

141264
def install(self, build_dir=None):
@@ -244,11 +367,13 @@ def build_libpython(capi_home):
244367
ext_modules=[module],
245368
)
246369

370+
247371
def build_builtin_exts(capi_home):
248372
args = [verbosity, 'build', 'install_lib', '-f', '--install-dir=%s/modules' % capi_home, "clean"]
249-
for ext in builtin_exts:
250-
distutil_ext = ext()
251-
res = setup(
373+
distutil_exts = [(ext, ext()) for ext in builtin_exts]
374+
def build_builtin_ext(item):
375+
ext, distutil_ext = item
376+
setup(
252377
script_name='setup_' + ext.name,
253378
script_args=args,
254379
name=ext.name,
@@ -258,11 +383,28 @@ def build_builtin_exts(capi_home):
258383
)
259384
logger.debug("Successfully built and installed module %s", ext.name)
260385

386+
for item in distutil_exts:
387+
build_builtin_ext(item)
388+
261389

262390
def build(capi_home):
263391
CAPIDependency.set_lib_install_base(capi_home)
264-
build_libpython(capi_home)
265-
build_builtin_exts(capi_home)
392+
393+
if threaded:
394+
import distutils.ccompiler
395+
import distutils.command.build_ext
396+
original_compile = distutils.ccompiler.CCompiler.compile
397+
original_build_extensions = distutils.command.build_ext.build_ext.build_extensions
398+
distutils.ccompiler.CCompiler.compile = pcompiler
399+
distutils.command.build_ext.build_ext.build_extensions = build_extensions
400+
401+
try:
402+
build_libpython(capi_home)
403+
build_builtin_exts(capi_home)
404+
finally:
405+
if threaded:
406+
distutils.ccompiler.CCompiler.compile = original_compile
407+
distutils.command.build_ext.build_ext.build_extensions = original_build_extensions
266408

267409

268410
if __name__ == "__main__":

mx.graalpython/mx_graalpython.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1545,7 +1545,7 @@ def build(self):
15451545
args.append("-v")
15461546
elif mx._opts.quiet:
15471547
args.append("-q")
1548-
args += ["-S", os.path.join(self.src_dir(), "setup.py"), self.subject.get_output_root()]
1548+
args += ["--python.WithThread", "-S", os.path.join(self.src_dir(), "setup.py"), self.subject.get_output_root()]
15491549
mx.ensure_dir_exists(cwd)
15501550
rc = self.run(args, cwd=cwd)
15511551
shutil.rmtree(cwd) # remove the temporary build files

0 commit comments

Comments
 (0)