""" Code to handle the different tables, that is all calls to the data warehouse """ # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __author__ = "Tom Oddershede" __maintainer__ = "Tom Oddershede" __version__ = 0.3 __all__ = ['DataWarehouseTable'] import os from fifocache import FIFOCache class DataWarehouseTable: """ The class to handle the different tables from R2.0 datawarehouse """ def __init__(self,_tablename,_cur,_cache=True,_filewrite=False): """ Constructor Arguments: _tablename The name of the table from the datastaging schema _cur The database connection _cache A boolean that determins whether the results should be cached _filewrite A boolean that determins whether the results shuld be written directly to the database, or whether it should be written to to a tempory file and the bulk loaded in the end. """ self.__tablename = _tablename self.__cur = _cur self.__doCache = _cache#True#False_cache self.__cache = FIFOCache(100000) self.__filewrite = False self.__seqCount = None; #Create the tempory file if _filewrite: self.__tempfile = os.tmpfile() self.__filewrite=True if _cache: self.__insertCache = {} self.__factcount = 0 #A list of tables, that should not ask the database for an ID self.__noID = ["scenariocoverage","languageusage","scenariomediatypeusage","technologyfinding","year","quarter", "week", "month", "date","testrun", \ "testresult", "resourcemediatypeusage"] #A list of tables the only asks the database for a new ID every 10000 time. #This requires that a sequence exists with the name {tablename}IDseq_FOR_ETL_tool self.__speedSeq = ["subject"] #A dictionary the holds the attributes that determins a row in the given table self.__tableKeys = {"year":["YearID"], \ "month":["MonthID"], \ "quarter":["QuarterID"], \ "week":["WeekID"], \ "date":["DateID"], \ "testrun":["TestRunID"], \ "topleveldomain":["TopLevelDomain"], \ "secondleveldomain":["SecondLevelDomain"], \ "site":["Site"], \ "resource":["URL"], \ "server":["CompleteServerName"], \ "language":["LanguageAbbreviation"], \ "languagefamily":["LanguageFamilyAbbreviation"], \ "resourceversion":["ResourceID","CopyLocation"], \ "barriercomputationversion":["BarrierComputationVersionName"], \ "barriercomputation":["BarrierComputationName"], \ "etlrun":["ETLRunID"], \ "etlversion":["Major","Minor","Build"], \ "mediatype":["MediaType"], \ "subject":["ResourceVersionID","Line","Col"], \ "mimetype":["MimeType","MimeSubtype"], \ "inclusiontype":["InclusionType"], \ "nutslevel3":["Nuts3Code"], \ "nace":["NaceCode"], \ "operatingsystemfamily":["OperatingSystemFamily"],\ "hour":["Hour"],\ "minute":["Minute"],\ "scenariotype":["ScenarioType"] ,\ "result":["ResultType"], "scenario":["r_id"], "mediatypeusage":["ScenarioID","MediaTypeID"] } #A dictionary that maps an attribute to a variable self.__tableVar = {"YearID":("YearID","d"), \ "MonthID":("MonthID","d"), \ "QuarterID":("QuarterID","d"), \ "WeekID":("WeekID","d"), \ "DateID":("DateID","d"), \ "TestRunID":("testrunnumber","d"), \ "TopLevelDomain":("topleveldomain","s"), \ "SecondLevelDomain":("secondleveldomain","s"), \ "Site":("site","s"), \ "URL":("url","s"), \ "CompleteServerName":("server","s"), \ "LanguageAbbreviation":("lang_suffix","s"), \ "LanguageFamilyAbbreviation":("lang","s"), \ "ResourceID":("page_id","d"), \ "CopyLocation":("copylocation","s"), \ "BarrierComputationVersionName":("barrcompversionname","s"), \ "BarrierComputationName":("barrcomp","s"), \ "Major":("ETLMajor","d"), \ "Minor":("ETLMinor","d"), \ "ETLRunID":("ETLRunID","d"), \ "Build":("ETLBuild","d"), \ "MediaType":("mediatype","s"), \ "ResourceVersionID":("pageversion_id","d"), \ "Line":("line","s"), \ "Col":("col","d"), \ "MimeType":("MimeType","s"), \ "MimeSubtype":("MimeSubtype","s"), \ "InclusionType":("InclusionType","s"), \ "Nuts3Code":("nuts","s"), \ "NaceCode":("NaceCode","s"),\ "OperatingSystemFamily":("OperatingSystemFamily","s"), \ "Hour":("Hour","s"), \ "Minute":("Minute","s"),\ "ScenarioType":("ScenarioType","s"),\ "ResultType":("ResultType","s"), \ "r_id":("r_id","d"), "ScenarioID":("scen_id","d"), \ "MediaTypeID":("mediatype_id","d"), \ } def select(self, argument): """ Returns the ID of the row from the table that matches the given select criteria Arguments: argument a dictionary containing the values that should be inserted. See the insert function for more information """ try: tmp = self.__tableKeys[self.__tablename.lower()] except KeyError: print "ERROR: Could not read the table %s. It may be insert-only" % (self.__tablename) return -1 #If filewrite and cache then we need to look into the not written cache if self.__doCache and self.__filewrite: cachetemp = ":".join([str(argument[self.__tableVar[x][0]]) for x in self.__tableKeys[self.__tablename.lower()]]) r_id = self.__insertCache.get(cachetemp) if r_id is not None: return r_id # Cache: If the value is contained in the cache then it is returned if self.__doCache: cachetemp = ":".join([str(argument[self.__tableVar[x][0]]) for x in self.__tableKeys[self.__tablename.lower()]]) r_id = self.__cache.lookup(cachetemp) if r_id is not None: return r_id #Create the SELECT statement where = " AND ".join([ x + " = %("+self.__tableVar[x][0]+")"+self.__tableVar[x][1] for x in self.__tableKeys[self.__tablename.lower()]]) sql = "SELECT %sID FROM %s WHERE %s"%(self.__tablename,self.__tablename, where) self.__cur.execute(sql, argument) r_id = self.__cur.fetchone() # If no row is found then none is returned if r_id is None: return None # Add the new found value to the cache if self.__doCache: self.__cache.add(cachetemp,(r_id[0])) return r_id[0] def selectOrInsert(self, argument): """Returns the ID of the row from the table that matches the given select criteria. If the row does not exist then it is a new row is inserted into the table and the ID of this row is returned Arguments: argument a dictionary containing the values that should be inserted. See the insert function for more information """ r_id = self.select(argument) if r_id is None: return self.insert(argument) return r_id def update(self, argument): """Updates the table. This is used in e.g., the ETL run table, that should be updated at the end of a run""" if self.__tablename.lower() == "etlrun": self.__cur.execute("UPDATE ETLRun SET TimeUsage = %(ETLTimeUsage)d WHERE ETLRunID = %(ETLRun_id)d", argument) else: print "ERROR: The table %s cannot be updated" % (self.__tablename) def insert(self, argument): """Inserts a new row into the table and the ID of this row is returned Arguments: argument a dictionary containing the values that should be inserted. For example if the table is SecondLevelDomain then the dictionary should contain secondleveldomain and toplevel_id. This argument is often locals() from calling function """ r_id = None # Gets the new ID from a sequence if (self.__tablename.lower() not in self.__noID): if self.__tablename.lower() in self.__speedSeq: r_id = self.__nextSeq() else: fetchIdSql = "SELECT nextval('%sID_seq')"%(self.__tablename) self.__cur.execute(fetchIdSql) r_id = self.__cur.fetchone()[0] argument["r_id"] = r_id # The different insert statements if self.__tablename.lower() == "secondleveldomain": self.__cur.execute("INSERT INTO SecondLevelDomain(SecondLevelDomainID, SecondLevelDomain, TopLevelDomainID) " + \ "VALUES(%(r_id)d, %(secondleveldomain)s, %(toplevel_id)d)",argument) elif self.__tablename.lower() == "topleveldomain": self.__cur.execute("INSERT INTO TopLevelDomain(TopLevelDomainID, TopLevelDomain) VALUES(%(r_id)d, %(topleveldomain)s)",argument) elif self.__tablename.lower() == "site": self.__cur.execute("INSERT INTO Site(SiteID, Site, SiteTitle, AddedBy, NutsLevel3ID, SecondLevelDomainID,NaceID) " + \ "VALUES(%(r_id)d, %(site)s ,%(sitetitle)s, %(siteaddedby)s, %(nuts3_id)d, %(secondleveldomain_id)d,%(nace_id)d)",argument) elif self.__tablename.lower() == "resource": self.__cur.execute("INSERT INTO Resource(ResourceID, URL, SiteID) VALUES (%(r_id)d, %(url)s, %(site_id)d)",argument) elif self.__tablename.lower() == "year": self.__cur.execute("INSERT INTO Year(YearID, Year) VALUES(%(YearID)d, %(Year)s)",argument) elif self.__tablename.lower() == "quarter": self.__cur.execute("INSERT INTO Quarter(QuarterID, QuarterName, QuarterNumberInYear, YearID) " + \ "VALUES(%(QuarterID)d, %(QuarterName)s, %(QuarterNumberInYear)d, %(YearID)d)",argument) elif self.__tablename.lower() == "week": self.__cur.execute("INSERT INTO Week(WeekID, WeekNumberInYear, YearID) VALUES(%(WeekID)d, %(WeekNumberInYear)d, %(WeekYearID)d)",argument) elif self.__tablename.lower() == "month": self.__cur.execute("INSERT INTO Month(MonthID, MonthNumberInYear, MonthName, DaysInMonth, MonthNumberAfterEIAOStart, QuarterID) " + \ "VALUES (%(MonthID)d, %(MonthNumberInYear)d, %(MonthName)s, %(DaysInMonth)d, " + \ "%(MonthNumberAfterEIAOStart)d, %(QuarterID)d)",argument) elif self.__tablename.lower() == "date": self.__cur.execute("INSERT INTO Date(DateID, DayNumberInMonth, DayNumberInWeek, DayName, Date, MonthID, WeekID) " + \ "VALUES(%(DateID)d, %(DayNumberInMonth)d, %(DayNumberInWeek)d, %(DayName)s, %(pgdate)s, %(MonthID)d, %(WeekID)d)",argument) elif self.__tablename.lower() == "testrun": r_id = argument["testrunnumber"] self.__cur.execute("INSERT INTO TestRun(TestRunID, TestRunNumber) VALUES (%(testrunnumber)d, %(testrunnumber)d)",argument) elif self.__tablename.lower() == "server": self.__cur.execute("INSERT INTO Server(ServerID, Server, ServerVersion, CompleteServerName, OperatingSystemFamilyID) " \ "VALUES (%(r_id)d, %(servername)s, %(serverversion)s, %(server)s, %(operatingsystemfamily_id)d)",argument) elif self.__tablename.lower() == "languagefamily": self.__cur.execute("INSERT INTO LanguageFamily(LanguageFamilyID, LanguageFamily, LanguageFamilyAbbreviation) "\ "VALUES (%(r_id)d, %(languagename)s, %(lang)s)",argument) elif self.__tablename.lower() == "language": self.__cur.execute("INSERT INTO Language(LanguageID, Language, LanguageAbbreviation, LanguageFamilyID) VALUES " \ "(%(r_id)d, %(fulllanguagename)s, %(lang_suffix)s, %(languagefamily_id)d)",argument) elif self.__tablename.lower() == "resourceversion": self.__cur.execute("INSERT INTO ResourceVersion(ResourceVersionID, Doctype, ContentLength, ContentType, Encoding, ActualSize " +\ ",CopyLocation, AuthoringTool, TestRunID, ServerID, ResourceID, MinuteID, DateID) VALUES " + \ "(%(r_id)d, %(doctype)s, %(size)d, %(contenttype)s, %(encoding)s, %(actualsize)d ,%(copylocation)s, %(authoringtool)s, " + \ "%(testrun_id)d, %(server_id)d, %(page_id)d, %(minute_id)d, %(date_id)d)",argument) elif self.__tablename.lower() == "barriercomputationversion": self.__cur.execute("INSERT INTO BarrierComputationVersion(BarrierComputationVersionID, BarrierComputationVersionName, Version, " + \ "BarrierComputationID, FalseNegativeProb, FalsePositiveProb) VALUES(%(r_id)d, %(barrcompversionname)s, " + \ "%(version)d, %(barrcomp_id)d, %(falseNegativeProb)f, %(falsePositiveProb)f)",argument) elif self.__tablename.lower() == "etlversion": self.__cur.execute("INSERT INTO ETLVersion(ETLVersionID, Major, Minor, Build) " + \ "VALUES(%(r_id)d, %(ETLMajor)d, %(ETLMinor)d, %(ETLBuild)d)",argument) elif self.__tablename.lower() == "etlrun": self.__cur.execute("INSERT INTO ETLRun(ETLRunID, SourceModel, SourceDB, TimeUsage, ETLVersionID, MinuteID, DateID) " + \ "VALUES(%(r_id)d, %(ETLSourceModel)s, %(ETLSourceDB)s, %(ETLTimeUsage)d, %(ETLVersion_id)d, " + \ "%(Minute_id)d, %(Date_id)d)",argument) elif self.__tablename.lower() == "mediatype": self.__cur.execute("INSERT INTO MediaType(MediaTypeID, MediaType) VALUES(%(r_id)d, %(mediatype)s)",argument) elif self.__tablename.lower() == "subject": if self.__filewrite: self.__chkcount() self.__tempfile.write('%d\t%d\t%d\t%d\n' % (argument["r_id"], argument["line"], argument["col"], argument["pageversion_id"])) else: self.__cur.execute("INSERT INTO Subject(SubjectID, Line, Col, ResourceVersionID) " + \ "VALUES(%(r_id)d, %(line)d, %(col)d, %(pageversion_id)d)",argument) elif self.__tablename.lower() == "scenario": if self.__filewrite: self.__chkcount() self.__tempfile.write('%d\t%d\t%d\n' % (argument["r_id"], argument["site_id"], argument["testrun_id"])) else: self.__cur.execute("INSERT INTO Scenario(ScenarioID, SiteID, TestRunID) VALUES(%(r_id)d, %(site_id)d, %(testrun_id)d)",argument) elif self.__tablename.lower() == "technologyfinding": if self.__filewrite: self.__tempfile.write('%d\t%d\t%d\t%d\t%d\t%d\n' % (argument["mimetype_id"], argument["InclusionTypeID"], argument["subject_id"], \ argument["date_id"], argument["minute_id"], argument["testrun_id"])) else: self.__cur.execute("INSERT INTO TechnologyFinding(SubjectID, MimeTypeID, InclusionTypeID, DateID, MinuteID, TestRunID) " + \ "VALUES (%(subject_id)d, %(mimetype_id)d, %(InclusionTypeID)d, %(date_id)d, %(minute_id)d, %(testrun_id)d)",argument) elif self.__tablename.lower() == "testresult": if self.__filewrite: self.__tempfile.write('%f\t%d\t%d\t%d\t%d\t%d\t%d\t%d\n' % (argument["barrierIndicator"], argument["subject_id"], argument["minute_id"], \ argument["date_id"], argument["result_id"], argument["barrcompversion_id"], \ argument["ETLRun_id"], argument["testrun_id"])) else: self.__cur.execute("INSERT INTO TestResult(Probability, SubjectID, MinuteID, DateID, ResultID, BarrierComputationVersionID, ETLRunID, " + \ "TestRunID) VALUES (%(barrierIndicator)f, %(subject_id)d, %(minute_id)d, %(date_id)d, %(result_id)d), " + \ "%(barrcompversoin_id)d, %(ETLRun_id)d, %(testrun_id)d)",argument) elif self.__tablename.lower() == "scenariocoverage": if self.__filewrite: self.__tempfile.write('%d\t%d\n' % (argument["scen_id"], argument["pageversion_id"])) else: self.__cur.execute("INSERT INTO ScenarioCoverage(ScenarioID, ResourceVersionID) VALUES(%(scen_id)d, %(pageversion_id)d)",argument) elif self.__tablename.lower() == "mimetype": self.__cur.execute("INSERT INTO MimeType(MimeTypeID, MimeType, MimeSubtype) VALUES (%(r_id)d, %(MimeType)s, %(MimeSubtype)s)",argument) elif self.__tablename.lower() == "inclusiontype": self.__cur.execute("INSERT INTO InclusionType(InclusionTypeID, InclusionType) VALUES (%(inclusiontype_id)d, %(InclusionType)s)",argument) elif self.__tablename.lower() == "resourcemediatypeusage": self.__cur.execute("INSERT INTO ResourceMediaTypeUsage(ResourceVersionID, MediaTypeID) VALUES (%(resourceversion_id)d, %(mediatype_id)s)",argument) elif self.__tablename.lower() == "scenariomediatypeusage": self.__cur.execute("INSERT INTO ScenarioMediaTypeUsage(ScenarioID, MediaTypeID) VALUES (%(scen_id)d, %(mediatype_id)s)",argument) elif self.__tablename.lower() == "languageusage": self.__cur.execute("INSERT INTO LanguageUsage(ResourceVersionID, LanguageID) VALUES (%(resourceversion_id)d, %(language_id)s)",argument) else: print "ERROR: could not insert into the table %s. It may be read-only" %self.__tablename return -1 #If filewrite is enabled then we need to store the temperary results in memory if self.__doCache and self.__filewrite: cachetemp = ":".join([str(argument[self.__tableVar[x][0]]) for x in self.__tableKeys[self.__tablename.lower()]]) self.__insertCache[cachetemp] = r_id self.__factcount += 1 # Caches the new row if self.__doCache: cachetemp = ":".join([str(argument[self.__tableVar[x][0]]) for x in self.__tableKeys[self.__tablename.lower()]]) self.__cache.add(cachetemp,r_id) return r_id def __fetchSeqRange(self): """Gets the next range of IDs""" fetchIdSql = "SELECT nextval('%sID_seq_for_ETL_tool')"%(self.__tablename) self.__cur.execute(fetchIdSql) return int(self.__cur.fetchone()[0]) def __nextSeq(self): """Returns the next free SubjectID""" if self.__seqCount == None: self.__seqCount = self.__fetchSeqRange() self.__firstSeq = self.__seqCount res = self.__seqCount self.__seqCount += 1 if (self.__seqCount - self.__firstSeq) % 10000 == 0: # We have used the current bite of the sequence. Get a new one. self.__seqCount = self.__fetchSeqRange() return res def __chkcount(self): """Checks if there are too many results in memory, then the tempory file is bulk loaded""" if self.__factcount == 400000: self.__factcount = 0 self.writefile() self.fileclose() self.__tempfile = os.tmpfile() self.__insertCache = {} def writefile(self): """Copies a tab-separated file into the DW.""" if self.__filewrite: print "Copying %s information into table..." % self.__tablename self.__tempfile.flush() self.__tempfile.seek(0) self.__cur.copy_from(self.__tempfile, self.__tablename) def fileclose(self): """Closes the file""" if self.__filewrite: self.__tempfile.close()