diff --git a/examples/benchmark.py b/examples/benchmark.py new file mode 100644 index 000000000..9f7ea9638 --- /dev/null +++ b/examples/benchmark.py @@ -0,0 +1,56 @@ +import sys +import os +import time +import random + +import rdflib + +def resource(n): + return rdflib.URIRef("urn:resource:%d"%n) + +def prop(n): + return rdflib.URIRef("urn:property:%d"%n) + + +def createData(g,N, M=80): + + for x in range(N): + g.add(( resource(random.randint(0,M)), + prop(random.randint(0,M)), + resource(random.randint(0,M)))) + +if __name__=='__main__': + + g=rdflib.Graph(sys.argv[1]) + + g.open(os.tempnam(), create=True) + start=time.time() + #g.load("foaf890K.rdf") + createData(g,300000) + print len(g) + read=time.time() + sys.stderr.write("Reading took %.2fs\n"%(read-start)) + + x=set(g.subjects(prop(5), resource(5))) + t1=time.time() + sys.stderr.write("Subjects took %.2fs\n"%(t1-read)) + + x=set(g.predicates(resource(5), resource(5))) + t2=time.time() + sys.stderr.write("Predicates took %.2fs\n"%(t2-t1)) + + x=set(g.objects(resource(5), prop(5))) + t3=time.time() + sys.stderr.write("Objects took %.2fs\n"%(t3-t2)) + + x=set(g.subject_objects(prop(5))) + t4=time.time() + sys.stderr.write("SubjectObjects took %.2fs\n"%(t4-t3)) + + x=set(g.subject_predicates(resource(5))) + t5=time.time() + sys.stderr.write("SubjectPredicates took %.2fs\n"%(t5-t4)) + + x=set(g.predicate_objects(resource(5))) + t6=time.time() + sys.stderr.write("SubjectPredicates took %.2fs\n"%(t6-t5)) diff --git a/rdflib/plugin.py b/rdflib/plugin.py index 1383cb1c8..4a000607f 100644 --- a/rdflib/plugin.py +++ b/rdflib/plugin.py @@ -122,6 +122,9 @@ def plugins(name=None, kind=None): 'rdflib.plugins.memory', 'IOMemory') register('Sleepycat', Store, 'rdflib.plugins.sleepycat', 'Sleepycat') +register('BDBOptimized', Store, + 'rdflib.plugins.bdboptimized', 'BDBOptimized') + register('xml', Serializer, 'rdflib.plugins.serializers.rdfxml', 'XMLSerializer') diff --git a/rdflib/plugins/bdboptimized.py b/rdflib/plugins/bdboptimized.py new file mode 100644 index 000000000..ef069e912 --- /dev/null +++ b/rdflib/plugins/bdboptimized.py @@ -0,0 +1,532 @@ +import warnings +from bsddb import db +from urllib import pathname2url +from os.path import exists, abspath, join +from os import makedirs + +from rdflib import URIRef +from rdflib.store import Store, NO_STORE + +SUPPORT_MULTIPLE_STORE_ENVIRON = False + +warnings.warn("The BDBOptimized store is experimental and not yet recommended for production.") + +if db.version() < (4,3,29): + warnings.warn("Your BDB library may not be supported.") + +import logging +_logger = logging.getLogger(__name__) + +# TODO: performance testing? + +class NamespaceIndex: + + def __init__(self, db_env): + self.__db_env = db_env + self.__namespace = db.DB(db_env) + self.__namespace.open('namespace.db', None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT) + + self.__prefix = db.DB(db_env) + self.__prefix.open("prefix.db", None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT) + + def bind(self, prefix, namespace): + prefix = prefix.encode("utf-8") + namespace = namespace.encode("utf-8") + + t = self.__db_env.txn_begin() + try: + bound_prefix = self.__prefix.get(namespace, txn=t) + if bound_prefix: + self.__namespace.delete(bound_prefix, txn=t) + self.__prefix.put(namespace, prefix, txn=t) + self.__namespace.put(prefix, namespace, txn=t) + t.commit(0) + except Exception, e: + t.abort() + + def namespaces(self): + cursor = self.__namespace.cursor() + results = [] + current = cursor.first() + while current: + prefix, namespace = current + results.append((prefix, namespace)) + current = cursor.next() + cursor.close() + for prefix, namespace in results: + yield prefix, URIRef(namespace) + + def prefix(self, namespace): + namespace = namespace.encode("utf-8") + t = self.__db_env.txn_begin() + try: + r = self.__prefix.get(namespace, None) + t.commit(0) + return r + except Exception, e: + t.abort() + raise e + + def namespace(self, prefix): + prefix = prefix.encode("utf-8") + t = self.__db_env.txn_begin() + try: + r = self.__namespace.get(prefix, None) + t.commit(0) + return r + except Exception, e: + t.abort() + raise e + + def close(self): + self.__namespace.close() + self.__prefix.close() + +class IDMap: + def __init__(self, db_env, node_pickler): + self.__db_env = db_env + self.__dbp = db.DB(db_env) + self.__dbp.open("IDMap_hash.db", None, db.DB_HASH, db.DB_CREATE | db.DB_AUTO_COMMIT) + + self.__dbs = db.DB(db_env) + self.__dbs.open("IDMap_recno.db", None, db.DB_RECNO, db.DB_CREATE | db.DB_AUTO_COMMIT) + + # pickling and un-pickling the data + self.__node_pickler = node_pickler + + self.__loads = self.__node_pickler.loads + self.__dumps = self.__node_pickler.dumps + + def insert(self, key): + # this inserts a new key if the key was not available + t = self.__db_env.txn_begin() + try: + k = self.__dumps(key) + val = self.__dbp.get(k, txn=t) + # the key is not found, register a new value for it + if val is None: + val = "%s" % self.__dbs.append(k, t) + #dbp.put("counter", counter, txn=t) + self.__dbp.put(k, val, txn=t) + t.commit(0) + return val + except Exception, e: + t.abort() + + # t2.commit(0) + + def get_id(self, key): + k = self.__dumps(key) + t = self.__db_env.txn_begin() + try: + val = self.__dbp.get(k, txn=t) + t.commit(0) + if val == None: + return None + + return val + except Exception, e: + t.abort() + + def get_var(self, num): + t = self.__db_env.txn_begin() + try: + val = self.__dbs.get(num, txn=t) + t.commit(0) + return self.__loads(val) + except Exception, e: + t.abort() + + def close(self): + self.__dbp.close() + self.__dbs.close() + + def all(self): + l = [] + + cursor = self.__dbs.cursor() + current = cursor.first() + while current: + try: + key, value = current + l.append((key, value)) + current = cursor.next() + except Exception, e: + cursor.close() + + cursor.close() + return l + +def secondaryIndexKey(key, data): + # returns the first part of a tuple of ints joined by : in a str. + return (data.split("^")[0]) + +class QuadIndex: + + def __init__(self, db_env, idmapper): + self.__db_env = db_env + self.__map = idmapper + + self.__splitter = '^' + + self.__index_list = ['spoc', 'pocs', 'ocsp', 'ospc', 'cspo', 'cpso'] + self.__indices = self.__init_indices() + self.__use_index = self.__init_use_index() + self.__re_order = self.__init_re_order() + self.__open = True + + def __init_indices(self): + indices = {} + for index in self.__index_list: + indices[index] = db.DB(self.__db_env) + indices[index].open("index_%s.db" % index, None, db.DB_BTREE, db.DB_CREATE | db.DB_AUTO_COMMIT) + + return indices + + def __init_re_order(self): + # create functions that changes the variable order back + # to s,p,o,c + re_order = {} + + re_order['spoc'] = lambda (s,p,o,c): (s,p,o,c) + re_order['pocs'] = lambda (p,o,c,s): (s,p,o,c) + re_order['ocsp'] = lambda (o,c,s,p): (s,p,o,c) + re_order['ospc'] = lambda (o,s,p,c): (s,p,o,c) + re_order['cspo'] = lambda (c,s,p,o): (s,p,o,c) + re_order['cpso'] = lambda (c,p,s,o): (s,p,o,c) + + return re_order + + def __init_use_index(self): + # a hashmap deciding which index to use depending on bound variables + # there are 16 combinations and 6 indices + use_index = {} + + # spoc + use_index[(False, False, False, False)] = 'spoc' + use_index[(True, False, False, False)] = 'spoc' + use_index[(True, True, False, False)] = 'spoc' + use_index[(True, True, True, False)] = 'spoc' + use_index[(True, True, True, True)] = 'spoc' + + # pocs + use_index[(False, True, False, False)] = 'pocs' + use_index[(False, True, True, False)] = 'pocs' + use_index[(False, True, True, True)] = 'pocs' + + # ocsp + use_index[(False, False, True, False)] = 'ocsp' + use_index[(False, False, True, True)] = 'ocsp' + use_index[(True, False, True, True)] = 'ocsp' + + # cspo + use_index[(False, False, False, True)] = 'cspo' + use_index[(True, False, False, True)] = 'cspo' + use_index[(True, True, False, True)] = 'cspo' + + # cpso + use_index[(False, True, False, True)] = 'cpso' + + # ospc + use_index[(True, False, True, False)] = 'ospc' + + return use_index + + def insert(self, (s,p,o,c)): + # check if the key is available, + + # make sure there is a mapping for all the values + s_id = self.__map.insert(s) + p_id = self.__map.insert(p) + o_id = self.__map.insert(o) + c_id = self.__map.insert(c) + + index_map = self.__init_index_map((s_id, p_id, o_id, c_id)) + + t = self.__db_env.txn_begin() + try: + for index in self.__indices: + self.__indices[index].put(index_map[index], '', txn=t) + + t.commit(0) + except Exception, e: + t.abort() + + def delete(self, (s,p,o,c), txn=None): + (s_id, p_id, o_id, c_id) = self.__map_id((s,p,o,c)) + + # setup the indices + index_map = self.__init_index_map((s_id, p_id, o_id, c_id)) + + # since an index is in used within a transaction to traverse + # the keys to delete, the delete deadlocks when acting on that index + # close the cursor in __all_prefix before yielding? + if txn == None: + t = self.__db_env.txn_begin() + else: + t = self.__db_env.txn_begin(txn) + + try: + for index in self.__indices: + self.__indices[index].delete(index_map[index], txn=t, flags=0) + t.commit(0) + except Exception, e: + t.abort() + + # t2.commit(0) + + # returns a mapping from index configuration to a + # string in the format v1^v2^v3^v4, which is used + # as a key in the index + def __init_index_map(self, (s_id,p_id,o_id,c_id)): + indices = {} + + indices['spoc'] = self.__splitter.join([str(k) for k in (s_id, p_id, o_id, c_id)]) + indices['pocs'] = self.__splitter.join([str(k) for k in (p_id, o_id, c_id, s_id)]) + indices['ocsp'] = self.__splitter.join([str(k) for k in (o_id, c_id, s_id, p_id)]) + indices['ospc'] = self.__splitter.join([str(k) for k in (o_id, s_id, p_id, c_id)]) + indices['cspo'] = self.__splitter.join([str(k) for k in (c_id, s_id, p_id, o_id)]) + indices['cpso'] = self.__splitter.join([str(k) for k in (c_id, p_id, s_id, o_id)]) + + return indices + + # a 0 (or '0') in a BDB range query is first in the range + # returns the list of ints representing the bound + # variables in the index + + def __map_id(self, (s,p,o,c)): + def map_id(val): + m = self.__map.get_id(val) + if m == None: + return 0 + return int(m) + + return [map_id(v) for v in (s,p,o,c)] + + def __map_var(self, (s_id, p_id, o_id, c_id)): + def map_var(val): + v = self.__map.get_var(int(val)) + if v == None: + return '' + return v + + return tuple([map_var(v) for v in (s_id, p_id, o_id, c_id)]) + + def triples(self, (s,p,o,c), twopass=False): + # TODO: implement a twopass version where all IDs are collected before + # being mapped to their real values. Does this improve performance? + # + # iterates over the triples depending on the values of s,p,o,c + indices = {} + + (s_id, p_id, o_id, c_id) = self.__map_id((s,p,o,c)) + + # setup the indices + indices = self.__init_index_map((s_id, p_id, o_id, c_id)) + + # get the bool map for the current configuration + (s_bool, p_bool, o_bool, c_bool) = [v != 0 for v in (s_id, p_id, o_id, c_id)] + + current_index = self.__use_index[(s_bool, p_bool, o_bool, c_bool)] + prefix = indices[current_index] + # strip of all ^0 + # no bound variables + if not (True in (s_bool, p_bool, o_bool, c_bool)): + prefix = '' + # bound variables found, strip of trailing ^0 for the prefix + elif self.__splitter + '0' in prefix: + prefix = prefix[0:prefix.find(self.__splitter + '0')] + # otherwise use the given prefix + + re_order_f = self.__re_order[current_index] + + # convert the key back into the corresponding values + for k,v in self.__all_prefix(prefix, current_index): + (s,p,o,c) = self.__map_var(re_order_f(k.split(self.__splitter))) +# print (k,v, prefix, indices[current_index], s, p, o, c) + yield ((s,p,o), c) + + return + + def contexts(self, triple=None): + for k,v in self.__all_prefix('', index='cspo'): + (c,s,p,o) = self.__map_var(k.split(self.__splitter)) + yield c + + def remove(self, (s,p,o,c)): + [self.delete((s_t,p_t,o_t,c_t)) for ((s_t,p_t,o_t),c_t) in self.triples((s,p,o,c))] + + + def __len__(self, context=None): + return len([x for x in self.triples((None, None, None, context))]) + + def __all_prefix(self, prefix, index='spoc'): + next = True + next_key = prefix + + while next: + c = self.__indices[index].cursor() + try: + current = c.set_range(next_key) + next = c.next() + if next: + next_key, data = next + except db.DBNotFoundError, e: + next = None + # what happens when the cursor is closed and re-opened between + # each access, does this mean that the lookup will be done again + # or is the location preserved somehow? + # in the first case it is better to collect a list of results and + # then yield over this list + c.close() + + if current: + key, data = current + if key and key.startswith(prefix): + yield key, data + + if next_key and not next_key.startswith(prefix): + next = None + + def close(self): + self.__open = False + + for index in self.__indices: + self.__indices[index].close() + +class BDBOptimized(Store): + """ An alternative BDB store implementing the index-structure proposed in: + http://sw.deri.org/2005/02/dexa/yars.pdf + + Index structures + key -> int, int -> key for variable to id and id -> variable + Triple indices: spoc, pocs, ocsp, cspo, cpso, ospc + + This store is both transaction and context-aware. + """ + + context_aware = True + + # hmm - this was false, but if false cannot be used as sink for n3 parser + # I set it to true - but this probably breaks something. [GAG 2011/08/20] + formula_aware = True + + # TODO: transaction support + transaction_aware = True + + def __init__(self, configuration=None, identifier=None): + self.__open = False + self.__identifier = identifier + self.configuration = configuration + self.__locks = 5000 + self.__db_env = None + self.__id_mapper = None + self.__quad_index = None + self.__namespace_index = None + # Store.__init__ calls open if there is a configuration + super(BDBOptimized, self).__init__(configuration) + + def __get_identifier(self): + return self.__identifier + identifier = property(__get_identifier) + + def _init_db_environment(self, homeDir, create=True): + #NOTE: The identifier is appended to the path as the location for the db + #This provides proper isolation for stores which have the same path but different identifiers + + if SUPPORT_MULTIPLE_STORE_ENVIRON: + fullDir = join(homeDir,self.identifier) + else: + fullDir = homeDir + envsetflags = db.DB_CDB_ALLDB + envflags = db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | db.DB_INIT_TXN | db.DB_RECOVER + if not exists(fullDir): + if create==True: + makedirs(fullDir) + self.create(fullDir) + else: + return NO_STORE + + db_env = db.DBEnv() + db_env.set_cachesize(0, 1024*1024*50) # TODO + + # enable deadlock-detection + db_env.set_lk_detect(db.DB_LOCK_MAXLOCKS) + + # increase the number of locks, this is correlated to the size (num triples) that + # can be added/removed with a single transaction + db_env.set_lk_max_locks(self.__locks) + db_env.set_lk_max_lockers(self.__locks) + db_env.set_lk_max_objects(self.__locks) + + #db_env.set_lg_max(1024*1024) + #db_env.set_flags(envsetflags, 1) + db_env.open(fullDir, envflags | db.DB_CREATE,0) + return db_env + + def is_open(self): + return self.__open + + def open(self, path, create=True): + homeDir = path + + if self.__identifier is None: + self.__identifier = URIRef(pathname2url(http://github.com/abspath(homeDir))) + + self.__db_env = self._init_db_environment(homeDir, create) + self.__open = True + + self.__id_mapper = IDMap(self.__db_env, self.node_pickler) + self.__quad_index = QuadIndex(self.__db_env, self.__id_mapper) + self.__namespace_index = NamespaceIndex(self.__db_env) + + def triples(self, (subject, predicate, object), context=None): + for result in self.__quad_index.triples((subject, predicate, object, context)): + yield result + + def contexts(self, triple=None): + return self.__quad_index.contexts(triple=triple) + + def add(self, (subject, predicate, object), context, quoted=False, txn=None): + """\ + Add a triple to the store of triples. + """ + assert self.__open, "The Store must be open." + Store.add(self, (subject, predicate, object), context, quoted) + + self.__quad_index.insert((subject, predicate, object, context)) + + def remove(self, (subject, predicate, object), context, txn=None): + """ + Remove the matching triples and/or context from the store. Variables + can be unbound by using None. + """ + + assert self.__open, "The Store must be open." + Store.remove(self, (subject, predicate, object), context) + + self.__quad_index.remove((subject, predicate, object, context)) + + def bind(self, prefix, namespace): + return self.__namespace_index.bind(prefix, namespace) + + def namespace(self, prefix): + return self.__namespace_index.namespace(prefix) + + def prefix(self, namespace): + return self.__namespace_index.prefix(namespace) + + def namespaces(self): + for r in self.__namespace_index.namespaces(): + yield r + + def __len__(self, context=None): + return self.__quad_index.__len__(context) + + def close(self, commit_pending_transaction=True): + self.__open = False + self.__id_mapper.close() + self.__quad_index.close() + self.__namespace_index.close() + self.__db_env.close() + diff --git a/test/test_bdboptimized.py b/test/test_bdboptimized.py new file mode 100644 index 000000000..2f01dc005 --- /dev/null +++ b/test/test_bdboptimized.py @@ -0,0 +1,17 @@ +import logging + +_logger = logging.getLogger(__name__) + +from test import test_graph +from test import test_context + + +class BDBOptimizedGraphTestCase(test_graph.GraphTestCase): + store_name = "BDBOptimized" + non_core = True + bsddb = True + +class BDBOptimizedStoreTestCase(test_context.ContextTestCase): + store = "BDBOptimized" + non_core = True + bsddb = True