Skip to content

Commit 3cb3a3a

Browse files
committed
Added multiprocessing support to update_index! Thanks to CMGdigital
for funding development of this feature.
1 parent b96b47c commit 3cb3a3a

File tree

6 files changed

+221
-64
lines changed

6 files changed

+221
-64
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,4 @@ Thanks to
5151
* Luke Hatcher (lukeman) for documentation patches.
5252
* Trey Hunner (treyhunner) for a Whoosh field boosting patch.
5353
* Kent Gormat of Retail Catalyst for funding the development of multiple index support.
54+
* CMGdigital for funding the development on multiprocessing update_index.

docs/management_commands.rst

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ arguments::
5353
``--remove``:
5454
Remove objects from the index that are no longer present in the
5555
database.
56+
``--workers``:
57+
Allows for the use multiple workers to parallelize indexing. Requires
58+
``multiprocessing``.
5659
``--verbosity``:
5760
If provided, dumps out more information about what's being done.
5861
@@ -68,9 +71,9 @@ arguments::
6871
.. note::
6972

7073
This command *ONLY* updates records in the index. It does *NOT* handle
71-
deletions, so you may need to write a separate script that handles deleted
72-
models, such as a queue consumer or something that runs through all records
73-
and tries to load the model for it. Alternatively, you can use the
74+
deletions unless the ``--remove`` flag is provided. You might consider
75+
a queue consumer if the memory requirements for ``--remove`` don't
76+
fit your needs. Alternatively, you can use the
7477
``RealTimeSearchIndex``, which will automatically handle deletions.
7578

7679

Lines changed: 131 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import datetime
2+
import os
23
from optparse import make_option
34
from django.conf import settings
45
from django.core.exceptions import ImproperlyConfigured
56
from django.core.management.base import AppCommand
67
from django.db import reset_queries
78
from django.utils.encoding import smart_str
8-
from haystack import connections, connection_router
9+
from haystack import connections as haystack_connections
910
from haystack.constants import DEFAULT_ALIAS
1011
from haystack.query import SearchQuerySet
1112

@@ -14,6 +15,93 @@
1415
DEFAULT_AGE = None
1516

1617

18+
def worker(bits):
19+
# We need to reset the connections, otherwise the different processes
20+
# will try to share the connection, which causes things to blow up.
21+
from django.db import connections
22+
23+
for alias, info in connections.databases.items():
24+
# We need to also tread lightly with SQLite, because blindly wiping
25+
# out connections (via ``... = {}``) destroys in-memory DBs.
26+
if not 'sqlite3' in info['ENGINE']:
27+
try:
28+
del(connections._connections[alias])
29+
except KeyError:
30+
pass
31+
32+
if bits[0] == 'do_update':
33+
func, model, start, end, total, using, age, verbosity = bits
34+
elif bits[0] == 'do_remove':
35+
func, model, pks_seen, start, upper_bound, using, verbosity = bits
36+
else:
37+
return
38+
39+
unified_index = haystack_connections[using].get_unified_index()
40+
index = unified_index.get_index(model)
41+
backend = haystack_connections[using].get_backend()
42+
43+
if func == 'do_update':
44+
qs = build_queryset(index, model, age=age, verbosity=verbosity)
45+
do_update(backend, index, qs, start, end, total, verbosity=verbosity)
46+
elif bits[0] == 'do_remove':
47+
do_remove(backend, index, model, pks_seen, start, upper_bound, verbosity=verbosity)
48+
49+
50+
def build_queryset(index, model, age=DEFAULT_AGE, verbosity=1):
51+
extra_lookup_kwargs = {}
52+
updated_field = index.get_updated_field()
53+
54+
if age:
55+
if updated_field:
56+
extra_lookup_kwargs['%s__gte' % updated_field] = datetime.datetime.now() - datetime.timedelta(hours=age)
57+
else:
58+
if verbosity >= 2:
59+
print "No updated date field found for '%s' - not restricting by age." % model.__name__
60+
61+
if not hasattr(index.index_queryset(), 'filter'):
62+
raise ImproperlyConfigured("The '%r' class must return a 'QuerySet' in the 'get_queryset' method." % index)
63+
64+
# `.select_related()` seems like a good idea here but can fail on
65+
# nullable `ForeignKey` as well as what seems like other cases.
66+
return index.index_queryset().filter(**extra_lookup_kwargs).order_by(model._meta.pk.name)
67+
68+
69+
def do_update(backend, index, qs, start, end, total, verbosity=1):
70+
# Get a clone of the QuerySet so that the cache doesn't bloat up
71+
# in memory. Useful when reindexing large amounts of data.
72+
small_cache_qs = qs.all()
73+
current_qs = small_cache_qs[start:end]
74+
75+
if verbosity >= 2:
76+
if os.getpid() == os.getppid():
77+
print " indexed %s - %d of %d." % (start+1, end, total)
78+
else:
79+
print " indexed %s - %d of %d (by %s)." % (start+1, end, total, os.getpid())
80+
81+
# FIXME: Get the right backend.
82+
backend.update(index, current_qs)
83+
84+
# Clear out the DB connections queries because it bloats up RAM.
85+
reset_queries()
86+
87+
88+
def do_remove(backend, index, model, pks_seen, start, upper_bound, verbosity=1):
89+
# Fetch a list of results.
90+
# Can't do pk range, because id's are strings (thanks comments
91+
# & UUIDs!).
92+
stuff_in_the_index = SearchQuerySet().models(model)[start:upper_bound]
93+
94+
# Iterate over those results.
95+
for result in stuff_in_the_index:
96+
# Be careful not to hit the DB.
97+
if not smart_str(result.pk) in pks_seen:
98+
# The id is NOT in the small_cache_qs, issue a delete.
99+
if verbosity >= 2:
100+
print " removing %s." % result.pk
101+
102+
backend.remove(".".join([result.app_label, result.model_name, str(result.pk)]))
103+
104+
17105
class Command(AppCommand):
18106
help = "Freshens the index for the given app(s)."
19107
base_options = (
@@ -31,6 +119,10 @@ class Command(AppCommand):
31119
make_option("-u", "--using", action="store", type="string", dest="using", default=None,
32120
help='If provided, chooses a connection to work with.'
33121
),
122+
make_option('-k', '--workers', action='store', dest='workers',
123+
default=0, type='int',
124+
help='Allows for the use multiple workers to parallelize indexing. Requires multiprocessing.'
125+
),
34126
)
35127
option_list = AppCommand.option_list + base_options
36128

@@ -51,11 +143,12 @@ class Command(AppCommand):
51143

52144
def handle(self, *apps, **options):
53145
self.verbosity = int(options.get('verbosity', 1))
146+
self.batchsize = options.get('batchsize', DEFAULT_BATCH_SIZE)
54147
self.age = options.get('age', DEFAULT_AGE)
55148
self.remove = options.get('remove', False)
56149
self.using = options.get('using') or DEFAULT_ALIAS
57-
58-
self.backend = connections[self.using].get_backend()
150+
self.workers = int(options.get('workers', 0))
151+
self.backend = haystack_connections[self.using].get_backend()
59152

60153
if not apps:
61154
from django.db.models import get_app
@@ -77,7 +170,10 @@ def handle_app(self, app, **options):
77170
from django.db.models import get_models
78171
from haystack.exceptions import NotHandled
79172

80-
unified_index = connections[self.using].get_unified_index()
173+
unified_index = haystack_connections[self.using].get_unified_index()
174+
175+
if self.workers > 0:
176+
import multiprocessing
81177

82178
for model in get_models(app):
83179
try:
@@ -87,73 +183,49 @@ def handle_app(self, app, **options):
87183
print "Skipping '%s' - no index." % model
88184
continue
89185

90-
extra_lookup_kwargs = {}
91-
updated_field = index.get_updated_field()
92-
93-
if self.age:
94-
if updated_field:
95-
extra_lookup_kwargs['%s__gte' % updated_field] = datetime.datetime.now() - datetime.timedelta(hours=self.age)
96-
else:
97-
if self.verbosity >= 2:
98-
print "No updated date field found for '%s' - not restricting by age." % model.__name__
99-
100-
if not hasattr(index.index_queryset(), 'filter'):
101-
raise ImproperlyConfigured("The '%r' class must return a 'QuerySet' in the 'get_queryset' method." % index)
102-
103-
# `.select_related()` seems like a good idea here but can fail on
104-
# nullable `ForeignKey` as well as what seems like other cases.
105-
qs = index.index_queryset().filter(**extra_lookup_kwargs).order_by(model._meta.pk.name)
186+
qs = build_queryset(index, model, age=self.age, verbosity=self.verbosity)
106187
total = qs.count()
107188

108189
if self.verbosity >= 1:
109190
print "Indexing %d %s." % (total, smart_str(model._meta.verbose_name_plural))
110191

111-
pks_seen = set()
192+
pks_seen = set([smart_str(pk) for pk in qs.values_list('pk', flat=True)])
193+
batch_size = self.batchsize or self.backend.batch_size
112194

113-
for start in range(0, total, self.backend.batch_size):
114-
end = min(start + self.backend.batch_size, total)
115-
116-
# Get a clone of the QuerySet so that the cache doesn't bloat up
117-
# in memory. Useful when reindexing large amounts of data.
118-
small_cache_qs = qs.all()
119-
current_qs = small_cache_qs[start:end]
120-
121-
for obj in current_qs:
122-
pks_seen.add(smart_str(obj.pk))
123-
124-
if self.verbosity >= 2:
125-
print " indexing %s - %d of %d." % (start+1, end, total)
126-
127-
self.backend.update(index, current_qs)
195+
if self.workers > 0:
196+
ghetto_queue = []
197+
198+
for start in range(0, total, batch_size):
199+
end = min(start + batch_size, total)
128200

129-
# Clear out the DB connections queries because it bloats up RAM.
130-
reset_queries()
201+
if self.workers == 0:
202+
do_update(self.backend, index, qs, start, end, total, self.verbosity)
203+
else:
204+
ghetto_queue.append(('do_update', model, start, end, total, self.using, self.age, self.verbosity))
205+
206+
if self.workers > 0:
207+
pool = multiprocessing.Pool(self.workers)
208+
pool.map(worker, ghetto_queue)
131209

132210
if self.remove:
133211
if self.age or total <= 0:
134212
# They're using a reduced set, which may not incorporate
135213
# all pks. Rebuild the list with everything.
136-
pks_seen = set()
137214
qs = index.index_queryset().values_list('pk', flat=True)
138-
total = qs.count()
139-
140-
for pk in qs:
141-
pks_seen.add(smart_str(pk))
215+
pks_seen = set([smart_str(pk) for pk in qs])
216+
total = len(pks_seen)
142217

143-
for start in range(0, total, self.backend.batch_size):
144-
upper_bound = start + self.backend.batch_size
145-
146-
# Fetch a list of results.
147-
# Can't do pk range, because id's are strings (thanks comments
148-
# & UUIDs!).
149-
stuff_in_the_index = SearchQuerySet().models(model)[start:upper_bound]
218+
if self.workers > 0:
219+
ghetto_queue = []
220+
221+
for start in range(0, total, batch_size):
222+
upper_bound = start + batch_size
150223

151-
# Iterate over those results.
152-
for result in stuff_in_the_index:
153-
# Be careful not to hit the DB.
154-
if not smart_str(result.pk) in pks_seen:
155-
# The id is NOT in the small_cache_qs, issue a delete.
156-
if self.verbosity >= 2:
157-
print " removing %s." % result.pk
158-
159-
self.backend.remove(".".join([result.app_label, result.model_name, str(result.pk)]))
224+
if self.workers == 0:
225+
do_remove(self.backend, index, model, pks_seen, start, upper_bound)
226+
else:
227+
ghetto_queue.append(('do_remove', model, pks_seen, start, upper_bound, self.using, self.verbosity))
228+
229+
if self.workers > 0:
230+
pool = multiprocessing.Pool(self.workers)
231+
pool.map(worker, ghetto_queue)

tests/settings.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Haystack settings for running tests.
2-
DATABASE_ENGINE = 'sqlite3'
3-
DATABASE_NAME = 'haystack_tests.db'
2+
DATABASES = {
3+
'default': {
4+
'ENGINE': 'sqlite3',
5+
'NAME': 'haystack_tests.db',
6+
}
7+
}
48

59
INSTALLED_APPS = [
610
'django.contrib.admin',

tests/solr_tests/tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
warnings.simplefilter('ignore', Warning)
33

44
from solr_tests.tests.admin import *
5+
from solr_tests.tests.management_commands import *
56
from solr_tests.tests.solr_query import *
67
from solr_tests.tests.solr_backend import *
78
from solr_tests.tests.templatetags import *
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import pysolr
2+
from django.conf import settings
3+
from django.core.management import call_command
4+
from django.test import TestCase
5+
from haystack import connections
6+
from haystack import indexes
7+
from haystack.utils.loading import UnifiedIndex
8+
from core.models import MockModel
9+
10+
11+
class SolrMockSearchIndex(indexes.SearchIndex):
12+
text = indexes.CharField(document=True, use_template=True)
13+
name = indexes.CharField(model_attr='author', faceted=True)
14+
pub_date = indexes.DateField(model_attr='pub_date')
15+
16+
def get_model(self):
17+
return MockModel
18+
19+
20+
class ManagementCommandTestCase(TestCase):
21+
fixtures = ['bulk_data.json']
22+
23+
def setUp(self):
24+
super(ManagementCommandTestCase, self).setUp()
25+
self.solr = pysolr.Solr(settings.HAYSTACK_CONNECTIONS['default']['URL'])
26+
27+
# Stow.
28+
self.old_ui = connections['default'].get_unified_index()
29+
self.ui = UnifiedIndex()
30+
self.smmi = SolrMockSearchIndex()
31+
self.ui.build(indexes=[self.smmi])
32+
connections['default']._index = self.ui
33+
34+
def tearDown(self):
35+
connections['default']._index = self.old_ui
36+
super(ManagementCommandTestCase, self).tearDown()
37+
38+
def test_basic_commands(self):
39+
call_command('clear_index', interactive=False, verbosity=0)
40+
self.assertEqual(self.solr.search('*:*').hits, 0)
41+
42+
call_command('update_index', verbosity=0)
43+
self.assertEqual(self.solr.search('*:*').hits, 23)
44+
45+
call_command('clear_index', interactive=False, verbosity=0)
46+
self.assertEqual(self.solr.search('*:*').hits, 0)
47+
48+
call_command('rebuild_index', interactive=False, verbosity=0)
49+
self.assertEqual(self.solr.search('*:*').hits, 23)
50+
51+
def test_remove(self):
52+
call_command('clear_index', interactive=False, verbosity=0)
53+
self.assertEqual(self.solr.search('*:*').hits, 0)
54+
55+
call_command('update_index', verbosity=0)
56+
self.assertEqual(self.solr.search('*:*').hits, 23)
57+
58+
# Remove a model instance.
59+
MockModel.objects.get(pk=1).delete()
60+
self.assertEqual(self.solr.search('*:*').hits, 23)
61+
62+
# Plain ``update_index`` doesn't fix it.
63+
call_command('update_index', verbosity=0)
64+
self.assertEqual(self.solr.search('*:*').hits, 23)
65+
66+
# With the remove flag, it's gone.
67+
call_command('update_index', remove=True, verbosity=0)
68+
self.assertEqual(self.solr.search('*:*').hits, 22)
69+
70+
def test_multiprocessing(self):
71+
call_command('clear_index', interactive=False, verbosity=0)
72+
self.assertEqual(self.solr.search('*:*').hits, 0)
73+
74+
# Watch the output, make sure there are multiple pids.
75+
call_command('update_index', verbosity=2, workers=2, batchsize=5)
76+
self.assertEqual(self.solr.search('*:*').hits, 23)

0 commit comments

Comments
 (0)