#! /usr/bin/env python # -*- coding: utf-8 -*- """ Code to load data into the EIAO DW R20. The data is extracted from the EIAO triplestore. """ # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA import calendar import datetime import getopt import getpass import urllib import os import re import string import sys import time import SOAPpy import math import CharacterSet import urlparse from dateutil import relativedelta #https://moin.conectiva.com.br/DateUtil?action=AttachFile&do=get&target=python-dateutil-1.0.tar.bz2 and http://niemeyer.net/ import psycopg # Version 1.1.19 (Version 2 does not work) from barriercomputation_resultidmap import barriercomputation_resultidmap from eiaocat_catidmap import eiaocat_catidmap #from fifocache import FIFOCache from location_nuts3idmap import location_nuts3idmap #from housekeeping import HouseKeeping from rfc3066codes import countrycodes, languagecodes #from RDFreaderwriter import RDFreaderwriter from RDFreader3 import RDFreader3 from DataWarehouseTable import DataWarehouseTable from RDFTraverser import * import sc __author__ = "Christian Thomsen,Jens Frøkjær, Tom Oddershede, Morten Goodwin Olsen" __maintainer__ = "Christian Thomsen" __version__ = "0.9929.1" __all__ = ['DW20Loader'] debug = False#True # Global variables, incl. precompiled reg. exp.s allmonths = ("January", "February", "March", "April", "May", "June", "July", \ "August", "September", "October", "November", "December") alldays = ("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday") allquarters = ((1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)) quarternames = ('Q1', 'Q2', 'Q3', 'Q4') domainlevelsRE = re.compile(r'[.*\.]?([^\.]*)\.([^\.]{2,6})$') # Find the characters before the last dot and the 2 - 6 characters after the last dot charsetRE = re.compile(r'.*charset=(.*)\b') # Find the text after 'charset=' and before any delimiter rfc3066codesRE = re.compile('^([a-z]{2,3})([_|-]([a-z]{2}))?$', re.IGNORECASE) # Find 2 or 3 letters. If there is a dash or an underscore, # then find the two letters after it. serverRE = re.compile(r'^([^/]*)/(\(.*\)|\S*)') # Find the server name in front of the slash and the version number after the slash spliturlRE = re.compile(r'^([^:]*)://([^/]*)/?(.*)$') # Find the part before the :// (the protocol), the part before the / (the domain) and # the part after the / (the path on the server) eiao = "http://www.eiao.net/rdf/2.0#" eiaostat = "http://www.eiao.net/rdf/" purl10 = "http://purl.org/dc/elements/1.0/" purl11 = "http://purl.org/dc/elements/1.1/" wai = "http://www.w3.org/WAI/ER/EARL/nmg-strawman#" def hackexecute(data,data2=None): if data2: print data%(data2) else: print data class hackcur: def __init__(self,tempcur): self.tempcur = tempcur def execute(self,data,data2=None): if data2: print data%(data2) self.tempcur.execute(data,data2) else: print data self.tempcur.execute(data) def fetchone(self): return self.tempcur.fetchone() class DW20Loader: """ The class to load EIAO DW, R2.0. """ SUBJECT_CACHE_LIMIT = 4000000 # Max number of cached ((pageversion_id, line, col), subject_id) pairs def __init__(self, _user, _database, _password, _rdfdb = 'rdf', _rdfmodel = None, _fromtestrun = -1, _totestrun = -1, _sites = set(), _queryTiming = True, _queryPrefixes = [],_server=None,host='localhost'): """Constructor. Arguments: _user The username to use when connecting to the PostgreSQL server _database The database to use on the PostgreSQL server _password The password to use when connecting to the PostgreSQL server _rdfmodel [optional] The triplestore model to extract data from. If not given, the system configuration decides. _fromtestrun [optional] The lowest testrun number to include _totestrun [optional] The highest testrun number to include. -1 includes every testrun _queryTiming [optional] Sets whether queries towards the PostgreSQL database are timed. Default is True. _queryPrefixes [optional] If query timing is enabled, this argument can give a list of prefixes of queries. A query starting with a prefix in this list, will thus only be counted under that prefix. If for example SELECT is given in the list, all SELECT queries are counted together such that what follows later in the query is disregarded. """ self.__ETLSourceModel = _rdfmodel self.__ETLSourceDatabase = _rdfdb self.__starttime = time.time() self.crawlerlog = open(sc.SystemConfiguration().loglocation + '/crawler.log','a') self.__dwconn = psycopg.connect(user=_user, database=_database, password=_password,host=sc.SystemConfiguration().dwhost) if debug: self.__curtemp = self.__dwconn.cursor() self.__cur = hackcur(self.__curtemp) else: self.__cur = self.__dwconn.cursor() #self.__cur.execute = hackexecute self.__cur.execute("SET search_path TO datastaging") #self.__rdf_rw = RDFreaderwriter(_rdfmodel, dbdatabase=_rdfdb) self.__rdf_rw = RDFreader3(_rdfmodel,dbdatabase=_rdfdb,host='localhost')#host) self.__rdf_rw.addCacheData() #Speed optimization self.__urlDic = {} self.__ResourceMediaTypes = {} self.__minCWAM = 1.0 self.__maxCWAM = 0.0 self.__ScenarioCount = 0 self.__TidyPages = 0 self.__UnparsablePages = 0 #Starting to cache. Only if the ETL-server is available self.__rdftime = 0.0 if _server is not None: self.__rdfrep(None, None, None) while _server.getLoading(): time.sleep(5) print 'Wating for next site to be finished loading before I can start' _server.setLoading(True) #Finished caching self.__urldwconn = psycopg.connect(user="eiaourlrep", database="eiaourlrep") self.__urlcur = self.__urldwconn.cursor() self.__totestrun = _totestrun self.__fromtestrun = _fromtestrun self.__sites = _sites # Query timing capabilities self.__querycnt = {} self.__querytimes = {} if _queryTiming: self.execute = self.executeWithLogging self.__queryPrefixes = [q.strip().upper() for q in _queryPrefixes] else: self.execute = self.__cur.execute #self.execute = hackexecute #Create required table instances self.__yearTable = DataWarehouseTable("Year",self.__cur) self.__quarterTable = DataWarehouseTable("Quarter",self.__cur) self.__weekTable = DataWarehouseTable("Week",self.__cur) self.__monthTable = DataWarehouseTable("Month",self.__cur) self.__dateTable = DataWarehouseTable("Date",self.__cur) self.__testrunTable = DataWarehouseTable("TestRun",self.__cur) self.__topleveldomainTable = DataWarehouseTable("TopLevelDomain",self.__cur) self.__secondleveldomainTable = DataWarehouseTable("SecondLevelDomain",self.__cur) self.__siteTable = DataWarehouseTable("Site",self.__cur) self.__resourceTable = DataWarehouseTable("Resource",self.__cur) self.__serverTable = DataWarehouseTable("Server",self.__cur) self.__languageTable = DataWarehouseTable("Language",self.__cur) self.__languagefamilyTable = DataWarehouseTable("LanguageFamily",self.__cur) self.__languageusageTable = DataWarehouseTable("LanguageUsage",self.__cur, False) self.__resourceversionTable = DataWarehouseTable("ResourceVersion",self.__cur) self.__subjectTable = DataWarehouseTable("Subject", self.__cur, True, True) self.__scenarioTable = DataWarehouseTable("Scenario", self.__cur,True,False) self.__scenariocoverageTable = DataWarehouseTable("ScenarioCoverage", self.__cur, False) self.__mediatypeTable = DataWarehouseTable("MediaType", self.__cur) self.__resourcemediatypeusageTable = DataWarehouseTable("ResourceMediaTypeUsage", self.__cur,False) self.__scenariomediatypeusageTable = DataWarehouseTable("ScenarioMediaTypeUsage", self.__cur,False) self.__barriercomputationversionTable = DataWarehouseTable("BarrierComputationVersion",self.__cur) self.__barriercomputationTable = DataWarehouseTable("BarrierComputation",self.__cur) self.__etlversionTable = DataWarehouseTable("ETLVersion",self.__cur) self.__naceTable = DataWarehouseTable("Nace",self.__cur) self.__nutsTable = DataWarehouseTable("NutsLevel3",self.__cur) self.__etlrunTable = DataWarehouseTable("ETLRun", self.__cur, False) self.__mimetypeTable = DataWarehouseTable("MimeType", self.__cur) self.__technologyfindingTable = DataWarehouseTable("TechnologyFinding", self.__cur, False, True) self.__scenariocoverageTable = DataWarehouseTable("ScenarioCoverage", self.__cur, False, True) self.__testresultTable = DataWarehouseTable("TestResult", self.__cur, False, True) #Handle ETL Run self.__ETLRun_id = self.__addETLRun() def __readRDF(self, reader, subject, predicate, object,readonlyonce): """Calls reader.readRDF(subj, pred, obj) and returns the result. The spent time is measured and added to self.__rdftime""" starttime = time.time() res = reader.readRDF(subject, predicate, object,supercached=True,readonlyonce=False) endtime = time.time() self.__rdftime += (endtime - starttime) return res def __rdfrep(self, subject, predicate, object,readonlyonce=False): """Reads RDF from the RDF repository by calling self.__readRDF""" return self.__readRDF(self.__rdf_rw, subject, predicate, object,readonlyonce=False) def __downloadNumber(self, url,testrunid): site = spliturlRE.search(url).groups()[1] self.__urlcur.execute("SELECT count(distinct scenarioid) from page WHERE domain=%(site)s and testrunid=%(testrunid)s;", locals()) res = self.__urlcur.fetchone()[0] return res def __exhaustiveScan(self, url,testrunid): site = spliturlRE.search(url).groups()[1] self.__urlcur.execute("SELECT isexhaustive from SiteExhaustiveScan WHERE domain=%(site)s and testrunid=%(testrunid)s;", locals()) res = self.__urlcur.fetchone() if res: return res else: return False def __urlHelper(self, url, column): nurl = spliturlRE.search(url).groups()[1] if nurl not in self.__urlDic: self.__urlcur.execute("SELECT title, nuts, nace, source FROM site WHERE domain=%(nurl)s", locals()) res = self.__urlcur.fetchone() if res==None: print "Warning. Could not fetch column '%s' for '%s', using 'Unknown'" % (column, nurl) return "Unknown" self.__urlDic[nurl] = res; return self.__urlDic[nurl][column] def __urlTitle(self, url): """Returns the title of the url""" return self.__urlHelper(url, 0) def __urlNuts(self, url): """Returns the nuts of the url""" return self.__urlHelper(url, 1) def __urlNace(self, url): """Returns the nace of the url""" return self.__urlHelper(url, 2) def __urlSource(self, url): """Returns the source of the uri""" return self.__urlHelper(url, 3) def __isNan(self, x): """Tests if a number is nan""" return str(x).lower().find('nan')>=0 def executeWithLogging(self, query, values = None): """Executes and times the query with values inserted """ # If the query starts with something listed in self.__queryPrefixes # (set by __init__), the considered part of the query (in the timing, # not in the execution of the SQL, of course) is the # prefix. Otherwise, everything is counted just as it is typed. This # is currently enough for our purposes and we avoid more advanced # parsing code. # We use this modified query here and leave query unchanged for psycopg mod_query = query.strip().upper() for qp in self.__queryPrefixes: if mod_query.startswith(qp): mod_query = qp + '...' cnt = self.__querycnt.get(mod_query, 0) cnt += 1 self.__querycnt[mod_query] = cnt starttime = time.time() self.__cur.execute(query, values) endtime = time.time() msecs = self.__querytimes.get(mod_query, 0.0) msecs += endtime - starttime self.__querytimes[mod_query] = msecs def getQueryStats(self): """Returns the timing results for all timed queries. The result is a list of 4-tuples of the form (query, count, average time, total time) """ res = [] queries = self.__querycnt.keys() if len(queries) > 0: for query in queries: cnt = self.__querycnt[query] tot = self.__querytimes[query] avg = tot / cnt res.append((query, cnt, avg, tot)) return res def __addDateAndTimeData(self, datestring): """Returns (DateID, MinuteID) for a given date string. Inserts the data in the DW if needed. """ # A date may have one of the following formats: # # (1) "Sun, 06 Nov 1994 08:49:37 GMT" (RFC 822/1123), HTTP/1.0 and HTTP/1.0 # (2) "Sunday, 06-Nov-94 08:49:37 GMT" (RFC 850/1036), HTTP/1.0 # (3) "Sun Nov 6 08:49:37 1994". (ANSI C asctime), this format may not be generated in HTTP/1.0 or HTTP/1.1 # (4) "2005-09-01T00:33:48.542697", for assessment times, not generated by HTTP. # (5) None. We just return (-1, -1) which represents the unknown date and time # # We don't know the format apriori so we have to find it now to be able to parse the date. # We look at the fourth character in datestring (date[3]) which either is a comma, a whitespace, a digit, or a letter # (see the three examples above). Based on this, we know which format the date has and parse it. if datestring is None: return (-1, -1) # Handles case (5) argDatestring = datestring # datestring will be changed later. argDatestring may go into the cache ch = datestring[3] if ch == ',': fmt = '%a, %d %b %Y %H:%M:%S GMT' # Handles case (1) elif ch == ' ': fmt = '%a %b %d %H:%M:%S %Y' # Handles case (3) elif ch.isdigit(): # Case 4. Here we have to make a hack and cut off the dot and the part following the dot. try: datestring = datestring[0:datestring.find('.')] except ValueError: print "Warning. Could not parse date:", datestring fmt = '%Y-%m-%dT%H:%M:%S' # Handles (modified) case (4) else: fmt = '%A, %d-%b-%y %H:%M:%S GMT' # Handles case (2) try: datetuple \ = (_year, _month, _day, _hour, _minute, _second, _weekday, _dayinyear, _daylightsaving) \ = time.strptime(datestring, fmt) except ValueError: print "WARNING. Could not parse date:", datestring return (-1, -1) if _year < 1970: # We don't handle this. print "WARNING. Dates before year 1970 not supported. Cannot handle date", datestring return (-1, -1) # First consider years, months, and days and then hours and minutes. We cannot do it in one run since # from 2004-01-01 23:59:00 till 2004-01-02 00:01:00 the difference is only two minutes (and zero days). # However, the day number is different. epochdate = datetime.date(1970, 1, 1) date = datetime.date(_year, _month, _day) daysdiff = relativedelta.relativedelta(date, epochdate) minsdiff = relativedelta.relativedelta(datetime.datetime.utcfromtimestamp(time.mktime(datetuple)),\ datetime.datetime(1970, 1, 1)) # Now find all the values to insert into the Date and Time dimensions' sources YearID = daysdiff.years Year = _year QuarterID = None QuarterName = None for i in range(4): if _month in allquarters[i]: QuarterID = i + 1 + YearID * 4 QuarterName = '%s %d' % (quarternames[i], _year) QuarterNumberInYear = i break isocalendar = date.isocalendar() WeekID = YearID * 53 + isocalendar[1] # Find the ID for the year where the week number belongs to. We # consider the difference between year 1970 and the year where the # week belongs to (held in isocalendar[0]). This is enough since 1/1 # 1970 (which is the first date we allow) belongs to week 1 of year # 1970. WeekYear = isocalendar[0] WeekYearID = relativedelta.relativedelta(datetime.date(WeekYear, 1, 1), epochdate).years WeekNumberInYear = isocalendar[1] MonthID = YearID * 12 + daysdiff.months MonthNumberInYear = _month MonthName = allmonths[_month - 1] DaysInMonth = calendar.monthrange(_year, _month)[1] MonthNumberAfterEIAOStart = 12 * (Year - 2006) + MonthNumberInYear - 1 # Start (0) is set to Jan. 2006 DateID = YearID * 366 + MonthNumberInYear * 31 + daysdiff.days DayNumberInMonth = date.day DayNumberInWeek = date.isoweekday() # 1-based DayName = alldays[date.weekday()] # 0-based HourID = minsdiff.hours MinuteID = _hour * 60 + minsdiff.minutes # Now check if the relevant tuples exist in the database. Otherwise insert them. self.__yearTable.selectOrInsert(locals()) self.execute("SELECT YearID FROM Year WHERE YearID = %(WeekYearID)d", locals()) if self.__cur.fetchone() is None: self.execute("INSERT INTO Year(YearID, Year) VALUES(%(WeekYearID)d, %(WeekYear)s)", locals()) self.__quarterTable.selectOrInsert(locals()) self.__weekTable.selectOrInsert(locals()) self.__monthTable.selectOrInsert(locals()) rtemp = self.__dateTable.select(locals()) if rtemp is None: pgdate = '%.4d-%.2d-%.2d' % (Year, MonthNumberInYear, DayNumberInMonth) self.__dateTable.insert(locals()) return (DateID, MinuteID) def __extractTestRunNumber(self, testrun): """Extracts and returns the testnumber from a testrun name """ # testrun['object'] is like 'http://www.eiao.net/rdf/1.0#testRun_testRun_230' # So we find the testrun number after the last underscore. return int(testrun.split('_')[-1]) def __addTestRunData(self, testrun): """Finds or inserts data into Testrun and returns the ID""" # For now, the testrun is also the ID. testrunnumber = self.__extractTestRunNumber(testrun) self.__testrunTable.selectOrInsert(locals()) return testrunnumber def __addDomainData(self, website): """Finds or inserts data into Domain, SecondLevelDomain, and TopLevelDomain and returns the DomainID""" # Cut off protocol part if it is there website = urlparse.urlparse(website)[1] #website = website.split('://', 1)[-1] # Cut off last slash if it is there #if website.endswith('/'): # website = website[:-1] domainlevels = domainlevelsRE.search(website).groups() topleveldomain = domainlevels[1] secondleveldomain = '.'.join(domainlevels) # Deal with TopLevelDomain toplevel_id = self.__topleveldomainTable.selectOrInsert(locals()) # Deal with SecondLevelDomain secondlevel_id = self.__secondleveldomainTable.selectOrInsert(locals()) return secondlevel_id def __addSiteData(self, website): """Finds or inserts data into Site and SiteCategorisation and returns the SiteID""" secondleveldomain_id = self.__addDomainData(website) # Cut off protocol part if it is there site = website.split('://', 1)[-1] # Cut off last slash if it is there if site.endswith('/'): site = site[:-1] # Deal with Site site_id = self.__siteTable.select(locals()) if site_id is None: # The site is not there. We have to extract all the required information from the RDF rep. # and store it in the db. try: sitetitle = self.__urlTitle(website) except IndexError: print website siteaddedby = self.__urlSource(website) nuts = self.__urlNuts(website) nuts3_id = location_nuts3idmap.get(nuts) #if len(nuts) > 5: # nuts = nuts[-5:] #nuts3_id = self.__nutsTable.select(locals()) if nuts3_id is None: nuts3_id = -1 print 'Warning. Unknown NutsCode: %s' % nuts NaceCode = self.__urlNace(website) nace_id = self.__naceTable.select(locals()) if nace_id is None: nace_id = -1 print 'Warning. Unknown NaceCode: %s' % NaceCode site_id = self.__siteTable.insert(locals()) return site_id def __addPageData(self, url, site_id): """Finds or inserts data into Page and returns the ID""" # Prepare URL. Put the domain in lower case url = urllib.unquote(url) urlparts = spliturlRE.search(url).groups() url = urlparts[0] + '://' + urlparts[1].lower() + '/' + urlparts[2] return self.__resourceTable.selectOrInsert(locals()) def __addServerData(self, server): """Finds or inserts data into Server and returns the ID""" if server is None: return -1 # Unknown # We add each unique server identification string to the DW. server_id = self.__serverTable.select(locals()) if server_id is None: tmp = serverRE.search(server) if tmp is None: if server.find('Apache') != -1: servername = 'Apache' serverversion = 'Unknown' elif server.find('Microsoft-IIS') != -1: servername = 'Microsoft-IIS' serverversion = 'Unknown' elif server.find('Lotus-Domino') != -1: servername = 'Lotus-Domino' serverversion = 'Unknown' else: print "Warning. Cannot identify server product in %s" % server return -1 # ID for 'Unknown' else: (servername, serverversion) = tmp.groups() # Now find/guess the operating system family the server runs on osmap = {'Unknown':-1, 'Unix':0, 'Windows':1, 'Other':2} unixes = ['Unix', 'Red Hat', 'Fedora', 'Linux', 'linux', 'FreeBSD', 'freebsd', 'Solaris', 'MacOS X', 'Darwin',] windowses = ['Windows', 'Win32', 'Win64', 'Microsoft'] operatingsystemfamily_id = None for os in unixes: if server.find(os) != -1: operatingsystemfamily_id = osmap['Unix'] break if operatingsystemfamily_id is None: for os in windowses: if server.find(os) != -1: operatingsystemfamily_id = osmap['Windows'] break if operatingsystemfamily_id is None: print "Warning. Could not decide operating system family for server %s" % server operatingsystemfamily_id = osmap['Unknown'] server_id = self.__serverTable.insert(locals()) return server_id def __addLanguageData(self, langcode): """Finds or inserts data into Language and LanguageFamily and returns the LanguageID""" if langcode is None: # The value "Unknown" is preloaded with ID = -1. return -1 tmp = rfc3066codesRE.search(langcode) if tmp is None: print "Warning. Could not parse language code %s" % langcode return -1 (lang, suffix, country) = tmp.groups() lang = lang.lower() if suffix is None: suffix = '' else: suffix = suffix.replace('_', '-').upper() # To store abbreviations consistently if country is not None: country = country.upper() try: languagename = languagecodes[lang] except KeyError: print "Warning. Unknown language: %s in %s" % (lang, langcode) return -1 lang_suffix = lang + suffix language_id = self.__languageTable.select(locals()) if language_id is not None: return language_id #Else, the language is not there and we have to add it countryname = '' if suffix != '': try: countryname = countrycodes[suffix[1:]] except KeyError: print "Warning. Unknown country %s in %s" % (country, langcode) countryname = "Unknown" languagefamily_id = self.__languagefamilyTable.selectOrInsert(locals()) if suffix != '': fulllanguagename = '%s (%s)' % (languagename, countryname) else: fulllanguagename = '%s' % (languagename) return self.__languageTable.insert(locals()) def __addPageVersionData(self, range, site_id, testrun_id): """Finds or inserts data into PageVersion and returns the ID""" pagesurvey = self.__rdfrep(range, eiao + 'pageSurvey', None) #print 'Pagesurvey:',pagesurvey testsubject = self.__rdfrep(subject = pagesurvey, predicate = wai+'testsubject',object=None,readonlyonce=True) #print 'Testsubject:',testsubject,type(testsubject) tidyused = self.__rdfrep(subject = testsubject, predicate = eiao+'tidy',object=None,readonlyonce=True) or 0 if type(tidyused)==type(''): tidyused = int(tidyused) if tidyused: self.__TidyPages += 1 parsingfailed = self.__rdfrep(subject = testsubject, predicate = eiao+'parsingFailed',object=None,readonlyonce=True) or 0 if type(parsingfailed)==type(''): parsingfailed = int(parsingfailed) if parsingfailed: self.__UnparsablePages += 1 try: actualsize = int(self.__rdfrep(subject = testsubject, predicate = eiao+'calculatedSize',object=None,readonlyonce=True)) except TypeError: actualsize = -1 header = self.__rdfrep(pagesurvey, eiao + 'header', None,readonlyonce=True) # Extract all needed informations first and then do the insertions. size = -1 contenttype = 'Unknown' encoding = 'Unknown' copylocation = self.__rdfrep(pagesurvey, eiao + 'webcacheurl', None,readonlyonce=True) server_id = -1 language_id = -1 authoringtool = -1 # Now find the URL for the page. There may be a fullURL property for the pagesurvey. # (This only holds for some of them. This property was added after the first data # was collected and to keep that valid, it is not a requirement that a pagesurvey # has a fullURL). If there isn't a fullURL, we can get use range's page property. # This may, however, only hold domain.org/dir instead of, e.g., domain.org/dir/index.html page = None tmp = self.__rdfrep(pagesurvey, eiao + 'fullURL', None)#,readonlyonce=True) if tmp: page = tmp else: page = self.__rdfrep(range, eiao + 'page', None,readonlyonce=True) page_id = self.__addPageData(page, site_id) doctype = self.__rdfrep(pagesurvey, eiao + 'doctype', None,readonlyonce=True) authoringtool = self.__rdfrep(pagesurvey, eiao + 'authoringtool', None,readonlyonce=True) if authoringtool: authoringtool = ''.join([i for i in authoringtool if i.lower() in string.lowercase + ' ']) if not authoringtool == str(authoringtool): print 'Error in authoringtool',authoringtool minute_id = -1 date_id = -1 # Now see if this page version already is there. We disinguish page versions by considering their copy locations. resourceversion_id = self.__resourceversionTable.select(locals()) if resourceversion_id is not None: return resourceversion_id # It wasn't there. Add it now... #headerinfos = self.__rdfrep(header, None, None) try: size = int(self.__rdfrep(header,eiao + 'headercontent-length',None,readonlyonce=True)) except: pass#size = -1 try: contentStr = self.__rdfrep(header,eiao + 'headercontent-type',None,readonlyonce=True) contenttype = contentStr.split(';', 1)[0] tmp = charsetRE.search(contentStr) if tmp: encoding = tmp.groups()[0] or 'Unknown' else: encoding = 'iso-8859-1' except: pass#contenttype = 'Unknown' try: encoding = self.__rdfrep(header,eiao + 'headerchar-set',None,readonlyonce=True) or 'Unknown' except: pass try: server = self.__rdfrep(header,eiao + 'headerserver',None,readonlyonce=True) or None#'Unkown' server_id = self.__addServerData(server) except: server_id = -1 try: lastmodified = self.__rdfrep(header,eiao + 'headerlast-modified',None,readonlyonce=True) except: lastmodified = -1 #for e in headerinfos: #if e['predicate'] == eiao + 'headercontent-length': # size = int(e['object']) #elif e['predicate'] == eiao + 'headercontent-type': # contentStr = e['object'].lower() # contenttype = contentStr.split(';', 1)[0] # Ignore parameters # # See if charset is given explicitly. # tmp = charsetRE.search(contentStr) # if tmp: # encoding = tmp.groups()[0] # elif contenttype.startswith('text/'): # encoding = 'iso-8859-1' # This is HTTP's default for text/... types #elif e['predicate'] == eiao + 'headerchar-set': # encoding = e['object'] #elif e['predicate'] == eiao + 'headerserver': # server = e['object'] # server_id = self.__addServerData(server) #elif e['predicate'] == eiao + 'headerlast-modified': # lastmodified = e['object'] # (date_id, minute_id) = self.__addDateAndTimeData(lastmodified) encoding = CharacterSet.CharacterSet.findPreferredAlias(encoding) resourceversion_id = self.__resourceversionTable.insert(locals()) #Add language informations to the many-to-many relation MetaData = getMetaDataFromPageSurvey(self.__rdf_rw.readRDF, pagesurvey,"language") LangList = [] for mdata in MetaData: language_id = self.__addLanguageData(mdata[1]) if language_id not in LangList: self.__languageusageTable.insert(locals()) LangList.append(language_id) if MetaData == []: language_id = -2 #No languages listed self.__languageusageTable.insert(locals()) #Add mediatype informations to the many-to-many relation MetaData = getMetaDataFromPageSurvey(self.__rdf_rw.readRDF, pagesurvey,"mediaType") MediatypeList = [] for mdata in MetaData: mediatype_id = self.__addMediaTypeData(mdata[1]) if mediatype_id not in MediatypeList: self.__resourcemediatypeusageTable.insert(locals()) MediatypeList.append(mediatype_id) if MetaData == []: mediatype_id = -2 #No media types listed self.__resourcemediatypeusageTable.insert(locals()) self.__ResourceMediaTypes[pagesurvey] = MediatypeList return resourceversion_id def __addTechnologyFinding(self, subject_id, line, col, type, value, pagesurvey, testrun_id): """Adds technology data to the DW""" #Find the inclusiontype #(1) Internally linked #(2) Externally linked #(3) Embedded if type.lower() == "producer": InclusionTypeID = 6 elif type.lower() == "version": InclusionTypeID = 5 elif type.lower() == "creator": InclusionTypeID = 4 elif type.lower() == "technology": InclusionTypeID = 3 elif type.lower() == "externallinks": InclusionTypeID = 2 elif type.lower() == "internallinks": InclusionTypeID = 1 else: #Unknown Inclusion Type InclusionTypeID = -1 print subject_id,type.lower(),value if value != '' and value.lower() != "unknown": try: if type.lower() in ('producer','version','creator'): (MimeType,MimeSubtype) = type.lower(),value else: (MimeType,MimeSubtype) = value.split('/') mimetype_id = self.__mimetypeTable.selectOrInsert(locals()) except ValueError: print "Warning: Unknown mimeType %s. Using unknown/unknown" % value mimetype_id = -1 else: mimetype_id = -1; downloaded = self.__rdfrep(pagesurvey, eiao + 'downloaded', None,readonlyonce=True) try: (date_id, minute_id) = self.__addDateAndTimeData(downloaded) except IndexError: (data_id, minute_id) = (-1,-1) self.__technologyfindingTable.insert(locals()) def __addSubjectData(self, pageversion_id, line, col): """Adds Subject data to the DW""" return self.__subjectTable.selectOrInsert(locals()) def __includeTestRun(self, testrunnumber): """Decides if the given int argument represents a testnumber that should be loaded. This is decided by means of the fromtestrun and totestrun arguments given to the constructor""" if self.__fromtestrun > -1: if testrunnumber < self.__fromtestrun: return False if self.__totestrun > -1: if testrunnumber > self.__totestrun: return False # Otherwise we are within the limits return True #def obj(self, arg): # """Returns arg['object']""" # return arg['object'] def __addMediaTypeData(self, mediatype): """ Adds Mediatype data to the DW """ return self.__mediatypeTable.selectOrInsert(locals()) def __addScenarioData(self, scenario, site_id, testrun_id): """Adds Scenario data to the DW""" scen_id = self.__scenarioTable.insert(locals()) scenarioresult = float(self.__rdfrep(scenario, eiao + 'barrierIndicator', None,readonlyonce=True)) self.__cur.execute("INSERT INTO matviews.scenario(ScenarioID, TestRunID, DisabilityGroupID, CWAM) VALUES ( "+\ "%d, %d, %d, %f)" % (scen_id, testrun_id, 0, scenarioresult)) #Values for materialized view if scenarioresult > self.__maxCWAM: self.__maxCWAM = scenarioresult if scenarioresult < self.__minCWAM: self.__minCWAM = scenarioresult self.__ScenarioCount = self.__ScenarioCount +1 return (scenarioresult,scen_id) def __addScenarioMediaType(self, scenario, scen_id): """Adds data to the scenaroMediaType table""" range = self.__rdfrep(scenario, eiao + 'rangeLocation', None) HandledMediaTypes = [] if 1==1: #for range in ranges:#map(self.obj, ranges): #print 'Range:',range,scenario pagesurvey = list(self.__rdfrep(range, eiao + 'pageSurvey', None))[0] #Add media type informations to the many-to-many relation try: MetaData = self.__ResourceMediaTypes[pagesurvey] except KeyError: MT = getMetaDataFromPageSurvey(self.__rdf_rw.readRDF, pagesurvey,"mediaType")#,readonlyonce=True) MetaData = [] for mdata in MT: mediatype_id = self.__addMediaTypeData(mdata[1]) MetaData.append(mediatype_id) self.__ResourceMediaTypes[pagesurvey] = MetaData for mediatype_id in MetaData: if mediatype_id not in HandledMediaTypes: self.__scenariomediatypeusageTable.insert(locals()) HandledMediaTypes.append(mediatype_id) if HandledMediaTypes == []: mediatype_id = -2 #No media types listed self.__scenariomediatypeusageTable.insert(locals()) def __addBarrierComputationVersion(self, barrcompversionname): """Adds BarrierComputationVersion data to the DW""" res = self.__barriercomputationversionTable.select(locals()) if res is not None: return res # It is not in the DB. So insert it into both. (barrcomp, version) = barrcompversionname.rsplit('.', 1) version = int(version) barrcomp_id = self.__barriercomputationTable.select(locals()) if barrcomp_id is None: print "Warning: Unknown barrier computation: %s" % barrcomp barrcomp_id = -1 #False negative and false positive is not supported in R2 falseNegativeProb = 0.0 falsePositiveProb = 0.0 return self.__barriercomputationversionTable.insert(locals()) def __addETLRun(self): """Adds the ETL data to the DW""" #Get version informations version = __version__.split('.') ETLMajor = int(version[0]) ETLMinor = int(version[1]) ETLBuild = int(version[2]) ETLVersion_id = self.__etlversionTable.selectOrInsert(locals()) ETLSourceModel = self.__ETLSourceModel ETLSourceDB = self.__ETLSourceDatabase #Get current time and convert it into ANSI C asctime t = time.localtime(time.time()) etltime = time.strftime("%a %b %d %H:%M:%S %Y", t) Date_id, Minute_id = self.__addDateAndTimeData(etltime) #This will be updated when the ETL Run is finished ETLTimeUsage = -1 return self.__etlrunTable.insert(locals()) def __handleTestRuns(self): """The method that extracts test runs from the triplestore. Based on this data, the remaining data to load is extracted.""" factcnt = 0 #Handle ELT-Run ETLRun_id = self.__ETLRun_id # We write to data to insert in the fact table in a temporary file. # At the end of this procedure we use COPY FROM to load it. # This is faster than executeing many INSERTs for data we don't # read later on during the ETL process. #testruns = [self.__rdfrep(eiao + 'runs', eiao + 'testRun', None)] testrun = self.__rdfrep(eiao + 'runs', eiao + 'testRun', None,readonlyonce=True) #for tr in testruns:#map(self.obj, testruns): if 1==1: print testrun testrunnumber = self.__extractTestRunNumber(testrun) if not self.__includeTestRun(testrunnumber): return #testrun = '200809' testrun_id = self.__addTestRunData(testrun) #testrun_id = 200810 sitesurvey = self.__rdfrep(testrun, eiao + 'siteSurvey', None,readonlyonce=True) # There is always just one sitesurvey #for ss in sitesurveys:#map(self.obj, sitesurveys): if 1==1: handledPageVersions = set() # Used to avoid duplicated page scenarios. website = self.__rdfrep(sitesurvey, eiao + 'webSite', None,readonlyonce=True) #Hack #import urlparse #thissite = urlparse.urlparse(website)[1] thissite = website if thissite.startswith('http://'): thissite = thissite[7:] elif thissite.startswith('https://'): thissite = thissite[8:] #sepcon = psycopg.connect(user='eiaodw', database='pdfdw', password='eiaodw',host='eiao1.eiao.net') sepcon = psycopg.connect(user=sc.SystemConfiguration().dwuser, database=sc.SystemConfiguration().dwdatabase, password=sc.SystemConfiguration().dwpassword,host=sc.SystemConfiguration().dwhost) sepcur = sepcon.cursor() #print 'Currently handling website:',website print 'Currently handling website:',website sepcur.execute("SELECT True from datastaging.site natural join datastaging.resource natural join datastaging.resourceversion where testrunid=%(testrun_id)s and site=%(thissite)s;",locals()) try: shouldvisit = not sepcur.fetchall()[0][0] except: shouldvisit = True #End of hack shouldvisit = True if not shouldvisit: print 'Alraedy in the DW' return if len(self.__sites) > 0 and website not in self.__sites: # If the load has been restricted to certain sites, self.__sites will not be empty. # If it is not empty and the current site is not one of the given sites, then ignore the site. return # else we should consider the site self.crawlerlog.write('\n(ETL) Currently handling website:'+str(website)+'\n') siteresult = float(self.__rdfrep(sitesurvey, eiao + 'barrierIndicator', None,readonlyonce=True)) stddev = math.sqrt(float(self.__rdfrep(sitesurvey, eiao + 'variance', None,readonlyonce=True))) errorMargin = float(self.__rdfrep(sitesurvey, eiao + 'errorMargin', None,readonlyonce=True)) unavailablepages = float(self.__rdfrep(sitesurvey, eiao + 'unavailablePages', None,readonlyonce=True)) site_id = self.__addSiteData(website) scenarios = self.__rdfrep(sitesurvey, eiao + 'scenario', None) scenarioresults = [] numberfail = 0 numberpass = 0 for scen in scenarios:#map(self.obj, scenarios): # Now we only consider pagescenarios. Keyuse scenarios is not used in R2 # and are therefore disregarded if present. if self.__rdfrep(scen, eiao + 'typeofscenario', None,readonlyonce=True) != 'pagescenario': continue scenarioresult,scen_id = self.__addScenarioData(scen, site_id, testrun_id) scenarioresults.append(scenarioresult) range = self.__rdfrep(scen, eiao + 'rangeLocation', None)#,readonlyonce=True) HandledResourcesInThisScenario = [] if 1==1: #for range in ranges:#map(self.obj, ranges): #print range,scen pageversion_id = self.__addPageVersionData(range, site_id, testrun_id) #Adds data to the scenariocoverage table if pageversion_id not in HandledResourcesInThisScenario: self.__scenariocoverageTable.insert(locals()) HandledResourcesInThisScenario.append(pageversion_id) # Avoid duplicated page scearios. A duplicate arises if two key use scen. cover the same page. if pageversion_id in handledPageVersions: # Skip this range - we have already dealt with it. continue else: handledPageVersions.add(pageversion_id) #print 'Range:',range pagesurvey = self.__rdfrep(range, eiao + 'pageSurvey', None) resourceurl = self.__rdfrep(pagesurvey, eiao + 'fullURL', None) print ' ',urllib.unquote(resourceurl), ' ('+str(self.__ScenarioCount)+')' self.crawlerlog.write('(ETL) '+str(urllib.unquote(resourceurl))+' ('+str(self.__ScenarioCount)+')\n') sys.stdout.flush() metadata = self.__rdfrep(pagesurvey, eiao + 'metadata', None,readonlyonce=True) #Now all meta-data must be added as a subject and placed in the technologyfactfinding table for mdata in metadata:#map(self.obj, metadata): location = self.__rdfrep(mdata, eiao + 'singleLocation', None,readonlyonce=True) line = int(self.__rdfrep(location, eiao + 'line', None,readonlyonce=True)) col = int(self.__rdfrep(location, eiao + 'column', None,readonlyonce=True)) subject_id = self.__addSubjectData(pageversion_id, line, col) type = self.__rdfrep(subject = mdata, predicate = eiao+'type',object=None,readonlyonce=True) type = type[type.rfind('#')+1:] if type.lower() not in ("mediatype","language"): value = self.__rdfrep(subject = mdata, predicate = eiao+'value',object=None,readonlyonce=True) self.__addTechnologyFinding(subject_id, line, col, type, value, pagesurvey, testrun_id) #Now the testresults must be added to the fact table assertions = self.__rdfrep(pagesurvey, wai + 'asserts', None,readonlyonce=True) for ass in assertions:#map(self.obj, assertions): barrcomprep = self.__rdfrep(ass, wai + 'requirement', None,readonlyonce=True) #Note that the title of the UWEM test is the UWEM test ID. Lets not spend time extracting something we already know. barrcomp = barrcomprep#self.__rdfrep(barrcomprep, purl11 + 'title', None) barrcompversion_id = self.__addBarrierComputationVersion(barrcomp) location = self.__rdfrep(ass, eiao + 'singleLocation', None,readonlyonce=True) line = int(self.__rdfrep(location, eiao + 'line', None,readonlyonce=True)) col = int(self.__rdfrep(location, eiao + 'column', None,readonlyonce=True)) subject_id = self.__addSubjectData(pageversion_id, line, col) result_id = -1 # Unknown resultrep = self.__rdfrep(ass, wai + 'result', None,readonlyonce=True) result = self.__rdfrep(resultrep, wai + 'validity', None,readonlyonce=True) barrierIndicator = self.__rdfrep(subject = resultrep, predicate = eiao+'barrierIndicator' ,object=None) if barrierIndicator: barrierIndicator = float(barrierIndicator) else: barrierIndicator = -1 if result[-4:] == 'pass': #print 'Pass result' result_id = 0 numberpass += 1 elif result[-4:] == 'fail': #key = barrcomp.rsplit('.', 1)[0] key = barrcomp numberfail += 1 #print 'Fail result' #ry: # result_id = barriercomputation_resultidmap.get(key,None) or barriercomputation_resultidmap.get(key[:-2],None) if result_id == None: result_id = -1 print "Warning: Could not lookup result for %s. Assuming cannotTell" % key elif result[-10:] == 'cannotTell': pass # result_id is already -1 else: print "Warning. Unsupported result type (%s). Assuming cannotTell" % result downloaded = self.__rdfrep(pagesurvey, eiao + 'downloaded', None) (date_id, minute_id) = self.__addDateAndTimeData(downloaded) self.__testresultTable.insert(locals()) factcnt += 1 if (factcnt % 5000) == 0: print "%d facts so far" % factcnt self.__addScenarioMediaType(scen, scen_id) #Insert into materialized view self.__cur.execute("SELECT SiteID FROM matviews.site WHERE SiteID = %d AND TestRunID = %d and DisabilityGroupID = %d" % (site_id, testrun_id, 0)) matresult = self.__cur.fetchone() self.crawlerlog.write('\n\n') self.__DownloadCount = self.__downloadNumber(website,testrun_id) self.__ExhaustiveScan = self.__exhaustiveScan(website,testrun_id) if self.__DownloadCount > 6000: self.__ExhaustiveScan = False else: self.__ExhaustiveScan = True self.__PageAverage = sum(scenarioresults)/len(scenarioresults) stddev = (1./(float(self.__ScenarioCount)-1)* sum([(i-self.__PageAverage)**2 for i in scenarioresults]) )**0.5 siteresult = float(numberfail)/(numberfail+numberpass) print '\n\nNumber of samples:',self.__ScenarioCount print 'Number of downloaded pages:',self.__DownloadCount print 'Number of pages using tidy:',self.__TidyPages print 'Number of pages unparsable:',self.__UnparsablePages print 'Exhaustive scan:',self.__ExhaustiveScan print 'Score:',siteresult print 'Average page:',self.__PageAverage print 'Stddev:',stddev #NOTE: The exhaustive scan is boolean - not integer. Please fixme. Inserting boolean values through psycopg does not work well. http://grokbase.com/profile/id:QLOnqzHZP6q51fKj-PJtrCuVIarOBSncoVAXTsRDskg self.__cur.execute("INSERT INTO matviews.site(SiteID, TestRunID, DisabilityGroupID, CWAM, Stddev, ErrorMargin, MinCWAM, MaxCWAM, ScenarioCount, DownloadCount, UnavailablePages, PageAverage, ExhaustiveScan,UnparsablePages,TidyPages) " +\ "VALUES (%d, %d, %d, %f, %f, %f, %f, %f, %d, %d, %d, %f, %d, %d, %d)" % (site_id, testrun_id, 0, siteresult, stddev, errorMargin, self.__minCWAM, self.__maxCWAM, self.__ScenarioCount, self.__DownloadCount, unavailablepages, self.__PageAverage, int(self.__ExhaustiveScan), self.__UnparsablePages, self.__TidyPages)) #Now bulk insert the data into the respective tables self.__technologyfindingTable.writefile() self.__technologyfindingTable.fileclose() self.__subjectTable.writefile() self.__subjectTable.fileclose() self.__scenarioTable.writefile() self.__scenarioTable.fileclose() self.__testresultTable.writefile() self.__testresultTable.fileclose() self.__scenariocoverageTable.writefile() self.__scenariocoverageTable.fileclose() return factcnt def __checkBridgeData(self): """Calls the stored procedure bridgeDataValid and prints a warning if this returns false""" self.execute("SELECT bridgeDataValid()") res = self.__cur.fetchone()[0] if not res: print "Warning. Data in bridge tables not valid." print "Run datastaging.bridgeDataValid() to find the problems" def __checkConsistency(self): """Checks that no subject both passes and fails a specific test. Calls the stored procedure NumberOfAmbigousTests and prints a warning if the number of ambigous tests is greater than 0.""" self.execute("SELECT NumberOfAmbiguousTests()") res = self.__cur.fetchone()[0] if res > 0: print "Warning. The data has %d ambigous result(s)"%res return False return True def start(self): """Starts the load and will then invoke ANALYZE and finally print statistics about the executed SQL queries""" factcnt = self.__handleTestRuns() or 0 self.__dwconn.commit() #print "Doing ANALYZE..." #self.__cur.execute("ANALYZE") #print "ANALYZE done" #self.__checkBridgeData() #self.__checkConsistency() #Update the ETL timeusage ETLTimeUsage = int(time.time()-self.__starttime) ETLRun_id = self.__ETLRun_id self.__etlrunTable.update(locals()) self.__dwconn.commit() self.__cur.close() self.__dwconn.close() self.__urlcur.close() self.__urldwconn.close() #print " cnt avg tot query" #querystats = self.getQueryStats() #for (query, cnt, avg, tot) in querystats: # print "%7d %9.2f %8.0f %s" % (cnt, avg, tot, query) #print endtime = time.time() print "Time spent in total: %.2f secs. Time spent on reading RDF: %.2f secs." % (endtime - self.__starttime, self.__rdftime) #Prevents the ETL from hanging #No longer needed #self.__rdf_rw.stop() return factcnt def usage(): print "Usage:" print "python dw20load.py [OPTION]" print " -h --help Prints this message" print " -d --database=DATABASE Specifies the PostgreSQL database for the DW" print " -u --user=USER Specifies the PostgreSQL user" print " -p --password=PASSWORD Specifies the PostgreSQL password" print " -r --rdf-database=RDFDB Specifies the MySQL RDF database to use" print " -m --rdf-model=RDFMODEL Specifies the RDF model to use" print " -f --fromtestrun=N Specifies the first test run number to include" print " -t --totestrun=N Specifies the last test run number to include" print " -s --site=SITE Specifies that only the given site should" print " be included. May be used many times. Must be" print " identical to the website given in the RDF repos." print " -e ETLServer to connect to. (e.g. http://localhost:8890/)." print def main(argv): try: opts, args = getopt.getopt(argv, "hd:u:p:r:m:f:t:s:e:o:", ["help", "database", "user", "password", "rdf-database", "rdf-model", "fromtestrun", "totestrun", "site",'etlserver','host']) except getopt.GetoptError: usage(); sys.exit(1) database = user = password = rdfmodel = server = None rdfdatabase = 'rdf' fromtestrun = totestrun = -1 sites = set() host = 'localhost' #localhost as default for opt, arg in opts: if opt in ("-h", "--help"): usage() sys.exit() elif opt in ("-d", "--database"): database = arg elif opt in ("-u", "--user"): user = arg elif opt in ("-p", "--password"): password = arg elif opt in ("-r", "--rdf-database"): rdfdatabase = arg elif opt in ("-m", "--rdf-model"): rdfmodel = arg elif opt in ("-f", "--fromtestrun"): fromtestrun = arg elif opt in ("-t", "--totestrun"): totestrun = arg elif opt in ("-s", "--site"): sites.add(arg) elif opt in ("-o", "--host"): host = arg elif opt in ('-e',): server = arg server = SOAPpy.SOAPProxy(arg) print 'The ETL server is:',server else: print "Unknown parameter: %s" % opt if user is None: user = raw_input("Username: ") if database is None: database = raw_input("Database: ") if password is None: password = getpass.getpass() # rdfmodel may be None. In that case the system configuration decides. loader = DW20Loader(user, database, password, rdfdatabase, rdfmodel, fromtestrun, totestrun, sites, True,_server=server, host=host) print time.asctime(), "Starting to load data from triplestore into data warehouse" factcnt = loader.start() print time.asctime(), "Finished loading data from triplestore" print 'Added %d facts' % factcnt if server is not None: server.setLoading(False) #import sc #sc = sc.SystemConfiguration() #server,port = urlparse.urlsplit(sc.dbcleaner)[1].split(':') #s = SOAPpy.SOAPProxy(''.join(['http://',server,':',port,'/'])) #s.cleandb(rdfdatabase) if __name__ == "__main__": import os os.system('renice -19 ' + str(os.getpid())) import gc gc.set_threshold(100000,10,10) #temp #import hotshot, hotshot.stats #prof = hotshot.Profile("stones.prof") #prof.runcall(main,sys.argv[1:]) #prof.close() #stats = hotshot.stats.load("stones.prof") #stats.strip_dirs() #stats.sort_stats('time', 'calls') #stats.print_stats(20) #end temp main(sys.argv[1:])