""" Copyright 2005, 2006 EIAO Consoritum This program is distributed under the terms of the GNU General Public License. This file is part of the European Internet Accessibility Observatory (EIAO) EIAO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. EIAO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with EIAO; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ __author__ = 'Morten Goodwin Olsen' __maintainer__ = 'Nils Ulltveit-Moe' __version__ = 0.1 import string #import rdfsql import Redland import RDF from RDF import RedlandError import urllib from RDFgeneratorerror import * from pytriplestore import * from asynchRDFWriter import * from DBPool import DBServer class RDFreaderwriter: """Class for writing RDF to the local TripleStore """ def __init__(self, rdfmodel = None, systemconfiguration = None,dbdatabase=None,createpool=True): """Initial method for setting up connection to TripleStore Keyword arguments: rdfmodel -- [Optional] Model to operate on. Retrieved from system configuration if None systemconfiguration -- [Optional] Instance of the system configuration class (to ensure that only one instance of the system configuration is present). New instance created if None. dbdatabase -- [Optional] Which database to connect to. Retrieves this information from the System Configruation if left empty. createpool -- [Optional] Create a DB pool. True as default. """ #import pdb #pdb.set_trace() if not systemconfiguration: from sc import SystemConfiguration self.sc = SystemConfiguration() else: self.sc = systemconfiguration if rdfmodel: self.rdfmodel=rdfmodel else: self.rdfmodel=self.sc.rdfmodel if dbdatabase: self.sc.dbdatabase = dbdatabase #self.db = rdfsql.rs_connect(self.sc.dbdatabase, self.sc.dbusername, self.sc.dbpassword, self.rdfmodel) self.dt = PyTripleStore(host='localhost',user=self.sc.dbusername, passwd=self.sc.dbpassword, db=self.sc.dbdatabase,model=self.rdfmodel) #self.dt = PyTripleStore(host='localhost',user='rdf', passwd='rdf', db='test',model=self.rdfmodel) self.rdfbegin = """ """ self.rdfend = "" self.cache = [] self.supercache = [] self.asyncwriters = [] if createpool: #Optinal to create DB pool. Not needed when only a synhrounus connection is needed from e.g. housekeeping. self.DBServer = DBServer(self.writeRDF,self.dt.host,self.dt.user,self.dt.passwd,self.dt.db,self.dt.model) else: self.DBServer = None def stop(self): #import pdb #pdb.set_trace() self.sc.stop() if self.DBServer: self.DBServer.stop() self.DBServer.join() #def __del__(self): # import pdb # self.DBServer.join() def readRDF(self, subject,predicate,object, xml=False,cached=False,supercached=False): """Reading RDF from repository Keyword argument: subject -- Triples subject, None if any predicate -- Triples predicate, None if any object -- Triples object, None if any xml -- [Optional] Return as RDF XML. False is empty. cached -- [Optional] If the RDF should be read from cached results. This increases the reading speed. However, live results cannot be guaranteed. False if empty. supercached -- [Optional] Caches the entire RDF Graph. """ if supercached: #print 'Getting supercache values' if not self.supercache: print "Getting entire graph" self.supercache = self.readRDF(None,None,None) print "subject only cache" self.onlysubject = self.dt.onlysubject #self.onlysubject = dict([(a['subject'],a) for a in self.supercache]) print "predicate only cache" self.onlypredicate = self.dt.onlypredicate #self.onlyopredicate = dict([(a['predicate'],a) for a in self.supercache]) print "object only cache" self.onlyobject = self.dt.onlyobject #self.onlyobject = dict([(a['object'],a) for a in self.supercache]) #self.DBServer = DBServer(lambda:None, 'localhost','rdf','rdf','test','test') print "subject and predicate cache" self.onlysubjectpredicate = self.dt.onlysubjectpredicate #self.onlysubjectpredicate = dict([((a['subject'],a['predicate']),a) for a in self.supercache]) print 'Number of triples:',(len(self.supercache)),len(self.onlysubject),len(self.onlypredicate),len(self.onlyobject),len(self.onlysubjectpredicate) # #Read entire graph #import pdb #pdb.set_trace() if not predicate and not object: try: return self.onlysubject[subject] except KeyError: return [] #elif not subject and not object: # try: # return self.onlypredicate[predicate] # except KeyError: # return [] #elif not predicate and not subject: # try: # return self.onlyobject[object] # except KeyError: # return [] elif not object and subject and predicate: try: return self.onlysubjectpredicate[(subject,predicate)] except KeyError: return [] else: print 'Getting from all (inefficient, write cache for this)!',subject,predicate,object try: return [a for a in self.supercache if (a['subject']==subject or subject is None) and (a['predicate']==predicate or predicate is None) and (a['object']==object or object is None)] except KeyError: return [] statements = self.dt.gettriples(subject,predicate,object,self.rdfmodel,cached=cached) #triples = rdfsql.rs_find_triples(self.db, subject, predicate, object, rdfsql.ObjAny, 0, self.rdfmodel) #statements = [] #triple = rdfsql.rs_next_triple(triples) #while triple: # statements.append(triple) # del triple # triple = rdfsql.rs_next_triple(triples) #rdfsql.rs_free_result(triples) #del triples statements.sort() if xml: return self.getXMLFromTriples(statements) else: return statements def writeTriplesFast(self, triples): '''Writes a dictionary of Triples fast to 3Store Keyword arguments: triples -- Triples as a dictionary of triples (not XML/RDF) Returns None ''' for t in triples: #print '.', self.writeRDFTriple(t['subject'],t['predicate'],t['object'],t['literal']) def getXMLFromTriples(self, triples,redland=False): '''Returns XML from triples Keyword arguments: triples -- A list of triples from a triple result set redland -- [Optional] If triples is on redland format already. False as default Returns XML as a string ''' mo=RDF.Model() #Convert triples to a format understandable by the serializer for triple in triples: #raise str(triple) if(redland==False): #import pdb #pdb.set_trace() if(triple[("literal")] == 1): statement=RDF.Statement(RDF.Uri(triple[("subject")]), RDF.Uri(triple[("predicate")]), RDF.Node(triple[("object")])) else: statement=RDF.Statement(RDF.Uri(triple[("subject")]), RDF.Uri(triple[("predicate")]), RDF.Uri(triple[("object")])) mo.add_statement(statement) else: #print triple mo.add_statement(triple) #Do the serialization rdfxml=RDF.RDFXMLAbbrevSerializer() rdfxml.set_namespace('eiao',RDF.Uri('http://www.eiao.net/rdf/2.0#')) rdfxml.set_namespace('dc',RDF.Uri('http://purl.org/dc/elements/1.1/')) rdfxml.set_namespace('base',RDF.Uri('http://www.eiao.net/rdf/2.0#')) rdfxml.set_namespace('dct', RDF.Uri('http://purl.org/dc/terms/')) rdfxml.set_namespace('earl',RDF.Uri('http://www.w3.org/WAI/ER/EARL/nmg-strawman#')) out=rdfxml.serialize_model_to_string(mo) return out def writeRDFTriple(self, subject, predicate, object, objectliteral = False): """Inserts one RDF statement Keyword arguments: subject -- Triples subject, None if any predicate -- Triples predicate, None if any object -- Triples object, None if any objectliteral -- [Optional] True if object is of type literal. Default object is treated as URI. """ if type(object)==type(u''): object = object.encode('utf-8') if objectliteral: self.dt.writetriple(subject,predicate,object,1) else: self.dt.writetriple(subject,predicate,object,0) #if objectliteral: # rdfsql.rs_assert_triple(self.db, subject, predicate, object, rdfsql.ObjLiteral) #else: # rdfsql.rs_assert_triple(self.db, subject, predicate, object, rdfsql.ObjURI) def flushDB(self): """Flushing the current DB""" rdfsql.rs_flush(self.db) def commit(self,async=False): """Write cached RDF to 3Store """ for rdf in self.cache: self.writeRDF(rdf,addnamespace=False,performcache=False,async=async) self.cache = [] def rollback(self): """Performs a rollback, and removes all cached RDF """ self.cache = [] def writeRDF(self, rdf, addnamespace=True,performcache=False, async=False): """Writing RDF to the the to TripleStore Keyword arguments: rdf -- RDF as XML addnamespace -- [Optional] If namespace etc. should be added to the RDF. True as default. This should only be true if the rdf/xml does not contain xmlns-namespaces already. performcache -- [Optional] If the result should be cached. commit() needs to be run to perform the actual writing. No caching as default. async -- [Optional] If writing to the Database should be done in a separate noen blocking thread. Synchrounuos as default. returns object for asynchrounous writing if async=True, otherwise returns None """ #Debug. Removing cache to avoid confusion in use. #performcache = False if not rdf: raise EmptyRDFError(rdf,'') emptyrdf = rdf for i in string.whitespace: emptyrdf = emptyrdf.replace(i,'') if not emptyrdf: raise EmptyRDFError(rdf,'') else: del emptyrdf if addnamespace: rdf = ''.join([self.rdfbegin, rdf, self.rdfend]) #model = RDF.Model() #pars = RDF.RDFXMLParser() #parse XML string into the RDF model #if not performcache: # try: # pars.parse_string_into_model(model, rdf, base_uri="http://www.eiao.net/rdf/1.0#") # except Exception, e: # #Bug workaround. RedlandError could not be excepted. # if str(e.__class__) == 'RDF.RedlandError': # raise IllegalRDFError(rdf, str(e)) # else: # raise e if performcache: self.cache.append(rdf) else: #import pdb #pdb.set_trace() #stat = RDF.Statement(subject = None, predicate = None, object = None) #statements = model.find_statements(stat) #allstatements = [stat for stat in statements] if async: self.DBServer.writeRDF(rdf) #a = asynchwriter(rdf,self.writeRDFTriple,self.dt.host,self.dt.user,self.dt.passwd,self.dt.db,self.dt.model) #a = asynchwriter(allstatements,self.writeRDFTriple,self.dt.host,self.dt.user, self.dt.passwd,self.dt.db,self.dt.model) #a.start() #print 'All statements to write for one scenario is:',len(allstatements) #TODO: Next lines only for testing. Remove quickly #import traceback #try: # 21/0 #except Exception,e: # print traceback.extract_stack() #print 'Finished writing async' #self.asyncwriters.append(a) else: model = RDF.Model() pars = RDF.RDFXMLParser() try: pars.parse_string_into_model(model, rdf, base_uri="http://www.eiao.net/rdf/2.0#") except Exception, e: #Bug workaround. RedlandError could not be excepted. if str(e.__class__) == 'RDF.RedlandError': raise IllegalRDFError(rdf, str(e)) else: raise e stat = RDF.Statement(subject = None, predicate = None, object = None) statements = model.find_statements(stat) allstatements = [stat for stat in statements] for statement in allstatements: #print '.', if statement.subject.is_blank(): subject = str(statement.subject._get_blank_identifier()) else: subject = str(statement.subject.uri) predicate = str(statement.predicate.uri) if statement.object.is_blank(): object_type = rdfsql.ObjURI object = str(statement.object._get_blank_identifier()) elif statement.object.is_literal(): object_type = rdfsql.ObjLiteral if type(statement.object.literal_value['string'])==type(u''): object = statement.object.literal_value['string'].encode('utf-8') else: object = str(statement.object.literal_value['string']) else: object = str(statement.object.uri) object_type = rdfsql.ObjURI if type(object)==type(u''): object = object.encode('utf-8') self.writeRDFTriple(subject,predicate,object,object_type) #rdfsql.rs_assert_triple(self.db, subject, predicate, object, object_type) if __name__ == "__main__": rw = RDFreaderwriter(rdfmodel='EIAO_test_715',dbdatabase='timetest',createpool=False) noe = rw.readRDF(None,None,None,supercached=True) print noe rw.stop()