""" PyTripleStore Module for reading / writing / deleting from the 3Store database. Replaces C3Store. """ # # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA # import MySQLdb from _mysql_exceptions import * from RDFgeneratorerror import * import md5 from threading import * import pdb import time __author__ = 'Morten Goodwin Olsen' __maintainer__ = 'Morten Goodwin Olsen' __version__ = 0.1 maxhash = 2**64 class PyTripleStore: """Python Triple Store replacing the C3Store. """ def __init__(self, host, user, passwd, db,model): """Creates a connection to the MYSQL database Keyword arguments: host -- Host to connect to. E.g. localhost user -- Username for connection passwd -- Password for connection db -- Database for connection model -- RDF model """ self.host = host self.user = user self.passwd = passwd self.db = db self.model = model #pdb.set_trace() self.connection = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db) self.cursor = self.connection.cursor() #self.cursor.execute('set autocommit=0;') #self.cursor.execute('START transaction;') self.knownhash = {} self.knowntriples = {} self.cond = Condition() def __cc(self): try: self.cursor = self.connection.cursor() except: self.connection = MySQLdb.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db) self.cursor = self.connection.cursor() def __del__(self): #self.cursor.execute('COMMIT;') pass #self.dropconnection() def executeselect(self, selectstatement): """Executes a simple select statement Keyword arguments: selectstatement -- Select statement to execute Returns data as tuples if any """ self.cursor.execute(selectstatement) data = self.cursor.fetchall() return data def getmodelhash(self, model=None): """Returns the hashvalue of the model Keyword arguments: model -- [Optional] Model as a string Returns the hash of the given / current model. """ self.__cc() if not model: model = self.model self.cursor.execute("select hash from models where model = '"+model+"' limit 1;") return self.cursor.fetchall()[0][0] def gethashvalue(self, uri): """Gets the hashvalue of a URI Keyword arguments: uri -- URI to get the hashvalue from Returns hash value of URI. """ #self.cursor.execute("select hash from resources where uri = '"+uri+"';") #hash = self.cursor.fetchall()[0][0] hash = int(md5.new(uri).hexdigest()[:16],16) #Converting hashvalue from unsigned to signed integer if hash > (maxhash/2): hash = -(maxhash - hash) #hash = self.cursor.fetchall()[0][0] return hash def writetriplesfast(self, triples, model=None): """Function for writing triples fast. Keyword arguments: triples -- Triples as a list of dictionaries. E.g. [{'subject':'http://subject.com','object':'http://object.com','predicate':'http://predicate.com','model':'SOME MODEL','literal':0}] model -- [Optional] Which model to write to. Writes to standard model if left empty. Returns None """ if not model: model = self.model try: modelhash = self.getmodelhash(model) except IndexError: #Model has not been created yet modelhash = self.gethashvalue(model) s = "insert into models(hash,model) values('%s','%s');"%(modelhash,model) try: self.cursor.execute(s) except IntegrityError: pass pre = time.time() striples = [] sresources = [] sliterals = [] for t in triples: s = t[0] p = t[1] o = t[2] l = t[3] striples.append('(%s,%s,%s,%s,%s,0)'%(modelhash,self.gethashvalue(s),self.gethashvalue(p),self.gethashvalue(o),l)) sresources.append("(%s,'%s')"%(self.gethashvalue(s),s)) sresources.append("(%s,'%s')"%(self.gethashvalue(p),p)) if l: #Object is of type literal sliterals.append("(%s,'%s')"%(self.gethashvalue(o),o)) else: #Object is of type URI sresources.append("(%s,'%s')"%(self.gethashvalue(o),o)) #f = open('/var/log/eiao/time.log','a') #f.write('Generating hash-values: '+str(time.time()-pre)+'\n') #f.close() pre = time.time() self.cursor.execute('Set autocommit=0;') self.cursor.execute('START TRANSACTION;') if striples: s = "insert IGNORE into triples(model, subject, predicate, object, literal, inferred) values %s;"%(','.join(set(striples))) #print s self.cursor.execute(s) if sresources: s = "insert IGNORE into resources(hash,uri) values %s;"%(','.join(set(sresources))) #print s self.cursor.execute(s) if sliterals: s = "insert IGNORE into literals(hash,literal) values %s;"%(','.join(set(sliterals))) #print s self.cursor.execute(s) self.cursor.execute('COMMIT;') #f = open('/var/log/eiao/time.log','a') #f.write('Actually writing to database: '+str(time.time()-pre)+'\n') #f.close() def writetriple(self, subject, predicate, object, objecttype, model=None): """Write only one triples Keyword arguments: subject -- The subject to write predicate -- The predicate to write object -- The object to write model -- [Optional] The model to write, use standard model if left empty. Returns None """ if not model: model = self.model subhash = self.gethashvalue(subject) prehash = self.gethashvalue(predicate) objhash = self.gethashvalue(object) self.cond.acquire() if not self.connection: self.connection = MySQLdb.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db) self.cursor = self.connection.cursor() if not self.cursor: self.cursor = self.connection.cursor() try: modelhash = self.getmodelhash(model) except IndexError: #Model has not been created yet modelhash = self.gethashvalue(model) s = "insert into models(hash,model) values('%s','%s');"%(modelhash,model) try: self.cursor.execute(s) except IntegrityError: pass s = "insert into triples(model, subject, predicate, object, literal, inferred) values('%s','%s','%s','%s',%d,0);" %(modelhash,subhash,prehash,objhash,objecttype) self.cursor.execute(s) if objecttype: #This is a litereal s = "insert into literals(hash, literal) values('%s','%s');"%(objhash,object) else: #This is an URI s = "insert into resources(hash, uri) values('%s','%s');"%(objhash,object) try: self.cursor.execute(s) except IntegrityError: #Object has already been written pass if subhash not in self.knownhash.keys(): s = "insert into resources(hash, uri) values('%s','%s');"%(subhash,subject) try: self.cursor.execute(s) except IntegrityError: #subject has already been written pass self.knownhash[subhash] = subject if predicate not in self.knownhash.keys(): s = "insert into resources(hash, uri) values('%s','%s');"%(prehash,predicate) try: self.cursor.execute(s) except IntegrityError: #Predicate has already been written pass self.knownhash[prehash] = predicate self.cond.release() self.knowntriples = {} def gettriples(self,subject,predicate,object,model=None,cached=False,supercache=False): """Retrieve triples as a list of dictionaries subject -- Subject to be part of the triples retrieved. None if any. predicate -- Predicate to be part of the triples retrieved. None if any. object -- Object to be part of the triples retrieved. None if any. model -- [Optional] Model to be part of the triples retrieved. Current model if left empty. caches -- [Optional] If the models should be extracted using a FIFO queue. Not used if left empty. Note that if results are cached with a FIFO queue, latest results cannot be quarantied if the current model is currently being written to. supercached -- [Optional] for caching entire RDF graph Returns triples as a list of dictionaries """ if not subject and not object and not predicate: supercache = True if not model: model = self.model if cached and (subject,predicate,object,model) in self.knowntriples.keys(): return self.knowntriples[(subject,predicate,object,model)] subhash,prehash,objhash,modelhash = None,None,None,None #modelhash = self.getmodelhash(model) modelhash = self.gethashvalue(model) if subject: subhash = self.gethashvalue(subject) if object: objhash = self.gethashvalue(object) if predicate: prehash = self.gethashvalue(predicate) wherestate = ' model =' + str(modelhash) if prehash: wherestate += " and predicate = "+str(prehash) if objhash: wherestate += " and object = "+str(objhash) if subhash: wherestate += " and subject ="+str(subhash) #import pdb #pdb.set_trace() #self.cursor.execute('select distinct * from triples where '+wherestate+ ';') pre = time.time() self.__cc() try: self.cursor.execute('select distinct model,a.uri,b.uri,c.uri,triples.literal from triples,resources a, resources b, resources c where a.hash=subject and b.hash=predicate and triples.literal =0 and c.hash=object and '+wherestate+';') except: pdb.set_trace() triples = self.cursor.fetchall() self.__cc() self.cursor.execute('select distinct model,a.uri,b.uri,c.literal,triples.literal from triples,resources a, resources b, literals c where a.hash=subject and b.hash=predicate and triples.literal =1 and c.hash=object and '+wherestate+';') triples += self.cursor.fetchall() #f = open('/var/log/eiao/time.log','a') #f.write('Doing SQL:'+str(time.time()-pre)+'\n') #f.close() alltriples = [] #size = len(set(triples)) #num = 0 #if 1==0: #print 'Restructuring triples' if supercache: self.onlysubject = {} self.onlypredicate = {} self.onlyobject = {} self.onlysubjectpredicate = {} alltriples = [{}] all = len(set(triples)) num = 0 for t in set(triples): num += 1 if not num%10000: print num,'of',all,'loaded', elif not num%1000: print '.', if not supercache: alltriples.append({'subject':t[1],'predicate':t[2],'object':t[3],'literal':t[4]}) if supercache: dempdict = {'subject':t[1],'predicate':t[2],'object':t[3],'literal':t[4]} self.onlysubject[t[1]] = self.onlysubject.get(t[1],[]) + [dempdict] #self.onlypredicate[t[2]] = self.onlypredicate.get(t[2],[]) + [dempdict] #self.onlyobject[t[3]] = self.onlyobject.get(t[3],[]) + [dempdict] if self.onlysubjectpredicate.has_key((t[1],t[2])): self.onlysubjectpredicate[(t[1],t[2])] = self.onlysubjectpredicate.get((t[1],t[2]),[]) + [dempdict] else: self.onlysubjectpredicate[(t[1],t[2])] = [dempdict] #num +=1 #if num%1000==0: #print 'Percent:',float(num)/float(size) #print 't:',t #print 'subhash:',subhash #print 'prehash:',prehash #print 'objhash:',objhash #print 'modhash:',modelhash """ try: if not subject: if t[1] in self.knownhash.keys(): t_subject = self.knownhash[t[1]] else: self.cursor.execute("select uri from resources where hash = '"+str(t[1])+"';") t_subject = self.cursor.fetchall()[0][0] self.knownhash[t[1]] = t_subject else: t_subject = subject if not predicate: if t[2] in self.knownhash.keys(): t_predicate = self.knownhash[t[2]] else: self.cursor.execute("select uri from resources where hash = '"+str(t[2])+"';") t_predicate = self.cursor.fetchall()[0][0] self.knownhash[t[2]] = t_predicate else: t_predicate = predicate if not object: if t[3] in self.knownhash.keys(): t_object = self.knownhash[t[3]] else: if not t[4]: self.cursor.execute("select uri from resources where hash = '"+str(t[3])+"';") t_object = self.cursor.fetchall()[0][0] self.knownhash[t[3]] = t_object else: self.cursor.execute("select literal from literals where hash = '"+str(t[3])+"';") t_object = self.cursor.fetchall()[0][0] self.knownhash[t[3]] = t_object else: t_object = object alltriples.append({'subject':t_subject,'predicate':t_predicate,'object':t_object,'literal':t[4]}) if len(self.knownhash)>1000: self.knownhash.popitem() except IndexError: #Error retrieving subject,predicate or object pass """ #self.knowntriples[(subject,predicate,object,model)] = alltriples #if len(self.knowntriples)>1000: # self.knowntriples.popitem() return alltriples def deletetriple(self,subject,predicate,object,model=None): """Deletes a triple. Keyword arguments: subject -- Subject of the triple to be deleted predicate -- Predicate of the triple to be deleted object -- Object of the triple to be deleted model -- [Optional] Model of triple to be deleted Returns None """ if not model: model = self.model subhash,prehash,objhash,modelhash = None,None,None,None modelhash = self.getmodelhash(model) try: if subject: subhash = self.gethashvalue(subject) if object: objhash = self.gethashvalue(object) if predicate: prehash = self.gethashvalue(predicate) except IndexError: raise TripleDoesNotExistError(subject, predicate, object) if not subhash and not prehash and not objhash: return wherestate = ' model =' + str(modelhash) if prehash: wherestate += " and predicate = "+str(prehash) if objhash: wherestate += " and object = "+str(objhash) if subhash: wherestate += " and subject ="+str(subhash) self.cursor.execute('delete from triples where '+wherestate+ ';') #return self.cursor.fetchall() def dropconnection(self): """Drops the connection to the database Return None """ if self.connection: self.connection.close()