# -*- coding: UTF-8 -*- """The URL repository module """ # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __owner__ = "Morten Goodwin Olsen, Terje Gjosater" __maintainer__ = "Nils Ulltveit-Moe" __version__ = 0.1 import urlparse import urllib import psycopg2 import time import random import sys import traceback #from timeprofile import * from URLChecker import * import memcache import md5 import urllib2 import logger #profiler = timeprofile() from threading import Semaphore from parsexmlinclusionexclusion import * class URLRepUtils3: """URL repository utilities class """ def __init__(self,sco,deletemodel=False): """initialise the class Keyword arguments: sco -- System Configuration. deletemodel -- [Optional] If all tables in the URL repository model should be delted. """ self.scenarioids = set() psycopg2.threadsafety = 3 #0 Threads may not share the module. #1 Threads may share the module, but not connections. #2 Threads may share the module and connections. #3 Threads may share the module, connections and cursors. #In other words, setting thread safety equal to 3 makes mutexes etc. for the cursor unnecessary self.logger = logger.HarvestManLogger() self.sco = sco self.mc = memcache.Client([self.sco.memcache], debug=0) self.loglocation = self.sco.loglocation self.rdfprefix = self.sco.rdfprefix self.maxurlsfromsite = int(self.sco.maxurlsfromsite) self.nvalues = {} self.unavailable = {} self.uncheckable = {} self.numunavailable = 0 self.numconstimeout = 0 self.B2Wuncheckable = {} self.active = {} self.username = self.sco.urlrepusername self.password = self.sco.urlreppassword self.database = self.sco.urlrepdatabase self.host = self.sco.urlrephost self.__cc() self.deactivated = [] self.numsamples = {} self.aliases = {} self.already = set() self.domain = ''#None if deletemodel: self.removeURLRepository() #from housekeeping self.numpages = 0 self.currentTestRun = 0 self.currentSiteSurvey = 0 self.currentScenario = 0 self.currentPageSurvey = 0 self.currentTestLocationID = 0 self.currentRangeLocation = 0 #Adaptive Sampling Values self.barrierindicator = 0 self.variance = 0 self.errormargin = 0 #self.updatewamcontainerids() #State URLs to keep the state of HarvestMan even hen HarvestMan hangs self.stateMutex=Semaphore(1) self.mutexInsert = Semaphore(1) self.allurlsfromsiteMutex = Semaphore(1) self.writeScenarioMutex = Semaphore(1) self.createInclusionExclusionMutex = Semaphore(1) self.stateurls = [] self.allurlscrawled = False self.allurlsfromsite = [] self.nomoreurls = True #self.allurlsfromonesite = self.getAllURLsFromSite('www.nyc.estemb.org')# #[]#self.getProcessingState('www.nyc.estemb.org') #self.__cc() #domain = 'www.nyc.estemb.org' #self.cur.execute('select distinct url from page where domain=%(domain)s and isdownloaded=\'f\';',locals()) #self.allurlsfromonesite = [urllib2.unquote(i[0]) for i in self.cur.fetchall()] #self.writecon = psycopg.connect(host=self.host,user=self.username,database=self.database,password=self.password) #self.writecon.autocommit(False) #self.writecur = self.writecon.cursor() self.__ccwrite() #self.performance = open('/var/log/eiao/time.log','a') self.rules = [] def __ccwrite(self): """Internal function that checks if a connection is up. Use only for connections that should write to the urlrep. """ try: self.writecur.execute('select 1;') except: try: self.writecon.rollback() except: pass sys.exc_clear() try: self.writecon.close() except: pass self.writecon = psycopg2.connect(host=self.host,user=self.username,database=self.database,password=self.password) #self.writecon.autocommit(False) self.writecur = self.writecon.cursor() def __cc(self,closed=False): """Internal function that checks if a connection is up. If not, it reestablishes the connection. Returns None """ if closed: self.con = psycopg2.connect(host=self.host,user=self.username,database=self.database,password=self.password) #self.con.autocommit(False) self.cur = self.con.cursor() return try: self.cur = self.con.cursor() self.cur.execute('select 1;') except (psycopg2.OperationalError,psycopg2.InterfaceError,NameError,AttributeError): sys.exc_clear() self.con = psycopg2.connect(host=self.host,user=self.username,database=self.database,password=self.password) self.cur = self.con.cursor() def performInclusionExclusionCheck(self,url,site): #-- 0, +, http://www\.example\.com/site1/admin/private/.*,0, 1234 #-- 1, +, http://www\.example\.com/site1/admin/public/.*, 1, 1234 #-- 2, -, http://www\.example\.com/site1/admin/.*, 2, 1234 #-- 3, +, http://www\.example\.com/site1/.*, 3, 1234 url = url.lower() if not self.rules: #The rules have not yet been created #self.createInclusionExclusion(urlparse.urlparse(url)[1]) self.createInclusionExclusion(site) for inclusionexlusion,rule in self.rules: if inclusionexlusion=='-': #Exlusion if rule.match(url): print 'Exclusion:',url return False #Since the order goes from the most detailed and down, any exclusion means that there has not yet been an inclusion. else: #Inclusion if rule.match(url): #Since the order goes from the most detailed and down, any inclusion means that there has not yet been an exclusion. print 'Inclusion:',url return True return False #No exlusion or inclusion matched the URL. Beacuse of this, the URL is automatically exluced. def close(self): """Closing all connections""" self.__cc() self.con.commit() self.con.close() def createInclusionExclusion(self,domain): self.createInclusionExclusionMutex.acquire() self.__cc() #self.cur.execute('select patterntype,regexp from site natual join inclusionexclusion where site=%(domain)s order by orderid;',locals()) self.cur.execute('select patterntype,regexp from site,inclusionexclusion where site.domain=%(domain)s and site.siteid=inclusionexclusion.siteid order by orderid;',locals()) try: self.rules = [(i[0],re.compile(i[1])) for i in self.cur.fetchall()] except: self.rules = []#None if not self.rules: #No rules exists. Create rule of domain being included. if not domain.startswith('http'): domain = 'http[s]?://'+domain if not domain.endswith('/'): domain = domain+'/' print 'Creating rule:',domain + '.*' self.rules.append(('+',re.compile(domain + '.*'))) #Get all inclusiong and inclusion #Select list of ordered list patterns from the domain #Compile each expression #Add each expression on a list #Two issues. #Default rule that you apply. #Default policy. Firewall chain rule. None of the rules match. Exluce anything else. #If not match, the exclude. self.createInclusionExclusionMutex.release() def getAllStartURLsFromSite(self,domain): self.__cc() self.cur.execute('select distinct url,random() from page where domain=%(domain)s and testrunid=0 order by random() limit 50;',locals()) urls = list(set([urllib2.unquote(i[0]) for i in self.cur.fetchall() if not i[0].endswith('css') and not i[0].endswith('exe')])) return urls def getAllURLsFromSite(self,domain): return [] self.logger.info('Testrun:',self.currentTestRun) self.__cc()#closed=True) testrun = self.currentTestRun self.logger.info('Getting previous URLs') sys.stdout.flush() self.cur.execute('select max(testrunid) from page where domain=%(domain)s and testrunid<%(testrun)s;',locals()) res = self.cur.fetchall() if not res: #No earlier testrun exists. This is the first crawl towards the site. return [] secondlargesttestrun = res[0][0] maxurl = self.maxurlsfromsite self.logger.info('Getting acutal URLs') sys.stdout.flush() self.cur.execute('select distinct url from page where domain=%(domain)s and testrunid=%(secondlargesttestrun)s and url not in (select distinct url from page where domain=%(domain)s and testrunid=%(testrun)s) limit %(maxurl)s;',locals()) ret = [urllib2.unquote(i[0]) for i in self.cur.fetchall()] if not ret: self.nomoreurls = True self.logger.info('Finished getting previous URLs:',len(ret)) sys.stdout.flush() return ret def updateExhaustiveScan(self,site,testrunid,exhaustivescan): self.__cc() if exhaustivescan: self.cur.execute('update SiteExhaustiveScan set isexhaustive=true where domain=%(site)s and testrunid=%(testrunid)s and exists (select True from SiteExhaustiveScan where domain=%(site)s and testrunid=%(testrunid)s);',locals()) self.cur.execute('insert into SiteExhaustiveScan (domain,testrunid,isexhaustive) select %(site)s,%(testrunid)s,true where not exists (select True from SiteExhaustiveScan where domain=%(site)s and testrunid=%(testrunid)s);',locals()) else: self.cur.execute('update SiteExhaustiveScan set isexhaustive=false where domain=%(site)s and testrunid=%(testrunid)s and exists (select True from SiteExhaustiveScan where domain=%(site)s and testrunid=%(testrunid)s);',locals()) self.cur.execute('insert into SiteExhaustiveScan (domain,testrunid,isexhaustive) select %(site)s,%(testrunid)s,false where not exists (select True from SiteExhaustiveScan where domain=%(site)s and testrunid=%(testrunid)s);',locals()) def __del__(self): """Destructor Storing values for N """ self.__cc() self.con.commit() self.con.close() def getNextURL(self,url): self.allurlsfromsiteMutex.acquire() if self.allurlscrawled: self.allurlsfromsiteMutex.release() return None if not self.allurlsfromsite: if url.startswith('http'): url = urlparse.urlparse(url)[1] self.logger.info('Getting all URLs based on:',url) self.allurlsfromsite = self.getAllURLsFromSite(url) if not self.allurlsfromsite: self.logger.info('All URLs crawled') self.allurlscrawled = True if self.allurlsfromsite: ret = self.allurlsfromsite[0] self.allurlsfromsite.remove(ret) self.allurlsfromsiteMutex.release() return ret self.allurlsfromsiteMutex.release() return None def setCurrentBarrierIndicator(self, barrierindicator): """Setting the current barrier indicator KeyWord arguments: barrierindicator -- Barrier Indicator """ self.barrierindicator = barrierindicator def getCurrentBarrierIndicator(self): """Returns the current barrier indicator""" return self.barrierindicator def setCurrentVariance(self,variance): """Setting the current variance Keyword arguments: variance -- Variance """ self.variance = variance def getCurrentVariance(self): """Returns the current variance""" return self.variance def setCurrentErrorMargin(self, errormargin): """Setting the current error margin' Keyword arguments: errormargin -- Error margin """ self.errormargin = errormargin def getCurrentErrorMargin(self): """ Returns the current error margin""" return self.errormargin def getCurrentURLCount(self): """ Returns the current URL count""" return len(self.nvalues.keys()) def updatewamcontainerids(self): """Internal function for updating the WAM container IDs.""" #Deprecated return self.__cc() self.cur.execute('select distinct name,id from wamcontainer;') self.wamcontainers = dict(self.cur.fetchall()) def getjavascriptcheck(self, URL=None): """Function for retrieving a list of URLs submitted for JavaScript checking Keyword arguments: URL --[Optional] URL of the Site to be checked. If left empty retrieves all URLs scheduled for checking' """ self.__cc() if not URL: self.cur.execute('select distinct domain from site where jsfilter=True;') return ['http://'+domain[0] for domain in list(self.cur.fetchall())] else: URL=self.cleanupURL(URL, domainonly=True) self.cur.execute('select distinct jsfilter from site where domain=%(URL)s',locals()) try: return bool(self.cur.fetchall()[0][0]) except IndexError: sys.exc_clear() #Record does not exist. Site is not scheduled for JavaScript checking return False def submitforjavascriptcheck(self, URL, shouldcheck=True): """Function for scheduling a URL for javascript chekcing Keyword arguments: URL -- URL of site to be checked shouldcheck -- If the site should be checked or not. True as default. """ URL = self.cleanupURL(URL, domainonly=True) if shouldcheck: self.cur.execute('update site set jsfilter=True where domain=%(URL)s',locals()) else: self.cur.execute('update site set jsfilter=False where domain=%(URL)s',locals()) def insertInclusionExclusion(self,rules,domain): """Adding inxclusion and inclusion rules for a site Keyword arguments: rules -- List of rules in as following: [(id,exlusion or inclusion as '+' or '-',regexp),...] domain -- domain of the bellonging rule """ #create table inclusionexclusion( #patternid serial, -- Dumb ID to increase performance #patterntype varchar(1), -- + for inclusion, - for exlusion #regexp text, -- actual regular expression #orderid smallint, -- order of the regular expressions #siteid -- ID of the corresponding site. self.cur.execute('select siteid from site where domain=%(domains)s;',locals()) siteid = self.cur.fetchall()[0][0] for ruleid, inclusionorexclusion, regexp in rules: self.cur.execute('insert into inclusionexclusion (patterntype,regexp,orderid,siteid) values(%(inclusionorexclusion)s,%(regexp)s,%(ruleid)s,%(siteid)s);',locals()) def addSiteURL(self, URL, title, source, nace, continent, country, nuts3=None,inclusionexclusion=None): """Add an URL to the URL repository Keyword arguments; URL -- URL to the website title -- Title of the web site source -- The person that added the URL nace -- nace category continent -- Continent of the URL country -- Country of the URL nuts3 -- [Optional] Nuts-3 region integer Examples: >>> urlrep.removeURLRepository() >>> urlrep.addSiteURL('www.example1.com/','Example URL1',"Morten Goodwin Olsen","85.42",'EU','NO') 'www.example1.com/' >>> sites = urlrep.getAllSites() >>> import urllib >>> sites[0]==urllib.quote('www.example1.com') True """ if nace.strip()=='Capgemini' or nace.strip()=='Gapgemeni': nace,source = source,nace if not URL.startswith('http'): print 'Ignoring URL:',URL return preURL = str(URL) domain=str(self.cleanupURL(URL, domainonly=True)) rules = getRulesFromXML(inclusionexclusion) if rules: self.insertInclusionExclusion(rules,domain) title = str(title.decode('iso-8859-1').encode('utf-8')) source=str(source.decode('iso-8859-1').encode('utf-8')) coverage=self.addCoverage(continent, country, nuts3) category=self.addCategory(nace) if category=='Unknown': category='' self.cur.execute('insert into eiaourlrep.Site (domain,title,source,nuts,nace) select %(domain)s,%(title)s,%(source)s,%(coverage)s,%(category)s where not exists (select True from eiaourlrep.Site where domain=%(domain)s);',locals()) self.unavailable[URL] = (0,True) self.uncheckable[URL] = (0,True) self.B2Wuncheckable[URL] = (0,True) self.active[URL] = (True,True) self.addPageURL(preURL,domain) return URL def addPageURL(self, URL, siteURL, n='1', active='1'): """Add a page url, for use by the crawler to add new pages to a site. Keyword arguments; URL -- URL of the page siteURL -- URL to the website n -- n-value for the page active -- active state of the page Examples: >>> urlrep.removeURLRepository() >>> urlrep.addSiteURL('www.example1.com/','Example URL1',"Morten Goodwin Olsen",'84.11','EU','NO') 'www.example1.com/' >>> urlrep.addPageURL('www.example1.com/øæå','www.example1.com/') 'http://www.example1.com/%C3%B8%C3%A6%C3%A5' """ self.__cc() URL=self.cleanupURL(URL, domainonly=False) domain=self.cleanupURL(siteURL, domainonly=True) if domain not in URL: raise PageNotInSiteError(URL,siteURL) try: self.cur.execute("insert into page(domain,url,scenarioid,pageindex) select %(domain)s,%(URL)s,1,1 where not exists (select True from page where domain=%(domain)s and url=%(URL)s);",locals()) except Exception, e: sys.exc_clear() self.logger.debug(str(e)+'\n') self.con.commit() return URL def addCoverage(self, continent, country, nuts3=None): """Add a coverage only if it does not already exist Keyword arguments; continent -- Continent of the URL country -- Country of the URL nuts3 -- [Optional] Nuts-3 region integer Returns ID of Coverage """ checkLegalContinent(continent) #checkLegalCountry(country) if not nuts3: nuts3=country #else: #checkLegalNuts3(nuts3) askurl = '-'.join([i for i in [continent, country, nuts3] if i!=None]) return askurl def addCategory(self, nace): """Add a category only if it does not already exist Keyword arguments; nace -- nace category Returns ID of Category """ checkLegalNace(nace) return nace def addCategoryToSite(self, site, nace): """Add a category only if it does not already exist Keyword arguments; site -- Site to be added a new category for nace -- nace category """ checkLegalNace(nace) if self.rw.readRDF(None,'http://www.eiao.net/rdf/URL',site): self.rw.writeRDFTriple(site, 'http://www.eiao.net/rdf/category', nace) #initialisation of the repository def removeURLRepository(self): """Removes the contents of the URL repository Returns None """ self.__cc() shouldremove = raw_input('This removes the entire URL repository. Are you sure you want to do that= (y/n)').lower() if shouldremove in ('y','yes'): #self.cur.execute('truncate table eiaourlrep.WAMContainer;') try: #self.cur.execute('truncate table eiaourlrep.site,eiaourlrep.page;') #self.con.commit() pass except: self.logger.info('Could not remove old data. Please investigage') self.con.rollback() else: self.logger.info('Not truncating') self.nvalues = {} self.unavailable = {} self.uncheckable = {} self.B2Wuncheckable = {} self.active = {} def createURLRepository(self): """Creates a new repository with some initial values """ # self.addInitialCoverages() # self.addInitialCategories() self.addInitialURLs() # def addInitialCoverages(self): # """Adds the geographical coverages supported in the second release # """ # for continent in ["EU"]: # for country in ["DA","EN","DE","NO","PL"]: # self.addCoverage(continent,country) # def addInitialCategories(self): # """Adds the site categories supported in the second release # """ # for nace in ['47','47,9','47.91' ,'53','53.1','53.10','58','58.1','58.13','60','60.1','60.2', # '60.10','60.20','64','64.1','64.11','79','79.1','79.11','79.12','84','84.1','84.11','84.12', # '85','85.4','85.42','91','91.0','91.01','91.02','94','94.9','94.92']: # self.addCategory(nace) def addInitialURLs(self,urllist=None): """Read URLs with all necessary information from a list, and add it Note that this function updates all entries in the database Keyword arguments: urllist -- [Optional] Filename of URL to read. Uses default file if empty Returns None """ #Rewritten this function. Now supperts if one URL has has Nuts3, while another has not. if urllist: file = open(urllist,'r') else: file = open(self.urllist,'r') firstline=file.readline() i=0 if 1==1:#firstline.startswith(",,,,,"): #for line in file.xreadlines(): import csv for params in csv.reader(file): self.logger.info(params) #if line != "\n": if [some for some in params if some]: i=i+1 #line = line.replace(', ','').replace('"GE"','"DE"').replace('http:// ','http://').replace("'","") #params=line.split('","') #using generic addinitialurl-function instead of addInitialSiteURL, which is now removed #if len(params)==5: #if params[0].startswith('http'): if 1==1: #if not 'Gapgemeni' in params: # url = params[0].strip().strip('"') # title = params[1].strip().strip('"') # source = 'Capgemini' # nace = params[2].strip().strip('"') # continent = params[3].strip().strip('"') # country = params[4].strip().strip('"') #else: if 1==1: url = params[0].strip().strip('"') title = params[1].strip().strip('"') source = params[2].strip().strip('"') nace = params[3].strip().strip('"').replace(',','.') continent = params[4].strip().strip('"') country = params[5].strip().strip('"') nuts = params[6].strip().strip('"').replace(',','.') try: inclusionexclusion = params[7].strip().strip('"').replace(',','.') except IndexError: inclusionexclusion = None if not url.startswith('http'): url = 'http://'+url if not nace: nace = 'Unknown' if not continent: continent = 'EU' print i,'url',url,'title',title,'source',source,'nace',nace,'continent',continent,'country',country self.addSiteURL(url,title,source,nace,continent,country,nuts,inclusionexclusion) #self.addSiteURL(params[0].strip().strip('"'),params[1].strip().strip('"'),params[2].strip().strip('"'),params[3].strip().strip('"'),params[4].strip().strip('"'),params[5].strip().strip('"')) #elif len(params) > 6: # self.addSiteURL(params[0].strip().strip('"'),params[1].strip().strip('"'),params[2].strip().strip('"'),params[3].strip().strip('"'),params[4].strip().strip('"'),params[5].strip().strip('"'),params[6].strip().strip('"')) file.close() self.con.commit() #administrative functions def activateURL(self, URL): """Activate an URL Keyword arguments: URL -- The URL to be activated """ #Remember to also reset unavailability and uncheckability counts! self.__cc() URL=self.cleanupURL(URL) if URL in self.deactivated: self.deactivated.remove(URL) self.cur.execute('update page set isactive=True where url=%(URL)s;',locals()) self.cur.execute('update page set unavailable=0 where url=%(URL)s;',locals()) self.con.commit() def deactivateURL(self, URL): """Deactivate an URL Keyword arguments: URL -- The URL to be deactivated """ self.__cc() URL = self.cleanupURL(URL) self.deactivated.append(URL) try: self.cur.execute('update page set isactive=False where url=%(URL)s;',locals()) self.con.commit() except: sys.exc_clear() self.logger.debug('Error deactivating URL:'+URL+'\n') def getActivationState(self,URL): """Get activationstate for an URL Keyword arguments: URL -- The URL to be checked """ self.__cc() URL = self.cleanupURL(URL,domainonly=True) a = self.cur.execute('select distinct isactive from page where domain=%(URL)s;',locals()) return bool(self.cur.fetchall()[0][0]) def getDeactivatedURLs(self): """Get a list of all deactivated URLs """ if not self.deactivated: self.__cc() try: self.cur.execute('select distinct url from page where isactive=False;') self.deactivated = [a[0] for a in self.cur.fetchall()] except (psycopg2.InterfaceError,psycopg2.OperationalError): sys.exc_clear() return [] return self.deactivated def getUnavailableURLs(self,URL): """Building list of unavailable URLs Keyword arguments: URL -- URL to build unavailable dictionary from """ #Unavailable no longer exists - This makes the RDF graph bias. This needs to be ether reimplemented in the crawler or the RDF graph / ETL changed (The former is preferable). return 0 if not self.unavailable: self.__cc(); domain=str(self.cleanupURL(URL,domainonly=True)) self.cur.execute('select distinct url,unavailable from page where domain=%(domain)s;',locals()) try: self.unavailable = dict(self.cur.fetchall()) except psycopg2.ProgrammingError,e: if e.__str__()=='no results to fetch': self.logger.info('Not getting any results to fetch for url:',URL) #This is a workaround for psycopg2. http://osdir.com/ml/python.db.psycopg.devel/2004-08/msg00057.html else: raise(e) except ValueError: pass if not self.unavailable: self.unavailable = {URL:0} #Forcing the dictionary none-empty - avoiding same query to the URL reposityr over and over if unavailable is not present def buildNDictionary(self,URL): """Building the N-dictionary in memory Keyword arguments: URL -- URL to build N-dictionary from """ #if not self.nvalues: # self.__cc(); # domain=str(self.cleanupURL(URL,domainonly=True)) # self.cur.execute('select distinct url,nvalue from page where domain=%(domain)s;',locals()) # self.nvalues = dict(self.cur.fetchall()) #if not self.unavailable: # self.__cc(); # domain=str(self.cleanupURL(URL,domainonly=True)) # self.cur.execute('select distinct url,unavailable from page where domain=%(domain)s;',locals()) # self.unavailable = dict(self.cur.fetchall()) #def writeScenario(self,domain,url,scenarioid,pageindex): # """Function to store the scenario information of the pages used by the crawler # # KeyWord arguments: # domain -- Domain of the site # url -- URL of the page # scenarioid -- Scenario ID of the page # pageindex -- Index of the page # """ # self.cur.execute('insert into page(domain,url,scenarioid,pageindex) select %(domain)s,%(url)s,%(scenarioid)d,%(pageindex)d where not exists (select True from page where domain=%(domain)s and url=%(url)s and scenarioid=%(scenarioid)s and pageindex=%(pageindex)d);',locals()) def addSubDomain(self,domain,subdomain): """Function for adding connection between domain and subdomain Keyword arguments: domain -- Main domain subdomain -- Subdomain connected to the domain """ self.cur.execute('insert into Domains(domain,alias) select %(domain)s,%(subdomain)s where not exists (select True from Domains where domain=%(domain)s and alias=%(subdomain)s);',locals()) self.con.commit() def setSiteTimeoutPresent(self, domain): """Domain that timeout appears Keyword argument: domain -- Domain of the site """ self.cur.executemany('update site set timeout=1 where domain=%(domain)s;',locals()) self.con.commit() def getLastLastModified(self,url): return 0 try: if url[4]==':': url = urllib.quote(url) self.cur.execute('select timestamplastmodified from page where url=%(url)s;',locals()) if not self.cur.rowcount: return False res=self.cur.fetchall() self.logger.info('Last-modified in urlrep:',url,res) if not res: return False else: return res[0][0] except Exception, e: self.logger.debug('Error getting last-modified ('+str(e)+') '+url+' '+str(type(url))+'\n') traceback.print_exc() sys.exc_clear() self.__cc() return False def insert(self,sql,variables): pre = time.time() self.__ccwrite() self.writecur.executemany(sql,variables) self.writecon.commit() #self.performance.write('Inserting scenario:' + str(time.time()-pre) + '\n') #self.performance.flush() def writeNuts(self,nutslist): self.writeScenarioMutex.acquire() self.insert('insert into nutscategorisation(domain,nuts) select %(domain)s,%(nuts)s;',nutslist) self.writeScenarioMutex.release() def writeAllScenarios(self, scenariolist): """Function for writing a list of scenarios Keyword arguments: scenariolist -- list of HarvestManAutoUrlCollection """ #Currently this function assumes that the it is a list of dictionaries as with domain,url,scenarioid,pageindex #E.g. [{'domain':'domain','url':'url','scenarioid':0,'pageindex':0},{'domain':'domain','url':'url','scenarioid':0,'pageindex':1}..] self.writeScenarioMutex.acquire() try: print 1 if not self.domain: if scenariolist: self.domain = str(scenariolist[0]['domain']) self.domain = self.domain.rstrip('/') scenariolist = [i for i in scenariolist if self.performInclusionExclusionCheck(i['url'],self.domain)] print 2 for s in scenariolist: s['thistime'] = time.time() try: s['url'] = urllib.quote(s['url'].decode('utf-8','ignore')) except: s['url'] = urllib.unquote(urllib.quote(s['url'].decode('utf-8','ignore').encode('ASCII','ignore'))) s['domain'] = str(self.domain) s['testrunid'] = str(self.currentTestRun) s['filename'] = urllib.quote(s['url'].decode('utf-8','ignore'),safe='/') if s['url'].endswith('pdf'): s['typeofpage'] = 1 else: s['typeofpage'] = 0 realscenariolist = [] #Removing scenarios that only has .css (wrong scnarios). #FIXME: This is only a bad workaround, and should be fixed in the crawler. print 3 for scen in scenariolist: scenid = scen['scenarioid'] if [s for s in scenariolist if s['scenarioid']==scenid and not s['url'].endswith('css') and s['url']]: realscenariolist.append(scen) #print 4,realscenariolist if realscenariolist: #There has actually been downloads print 4.1 domain = str(scenariolist[0]['domain']) maxinsert = 10 error = True numerror = 0 print 4.2 #self.__cc() print 4.3 while error and numerror<2: try: print 5 self.insert('insert into page(domain,url,scenarioid,pageindex,timestampdownloaded,timestamplastmodified,isdownloaded,testrunid,pagehash,filename,typeofpage) select %(domain)s,%(url)s,%(scenarioid)s,%(pageindex)s,%(thistime)s,%(timestamplastmodified)s,\'t\',%(testrunid)s,%(pagehash)s,%(filename)s,%(typeofpage)s where not exists (select True from page where domain=%(domain)s and url=%(url)s and scenarioid=%(scenarioid)s and pageindex=%(pageindex)s and testrunid=%(testrunid)s) and not exists (select True from page where domain=%(domain)s and url=%(url)s and scenarioid=%(scenarioid)s and testrunid=%(testrunid)s);',realscenariolist) error = False except Exception, e: print 5.1 numerror += 1 time.sleep(random.uniform(2,4)) #Sometimes the connection to postgres is closed. If this happens wait a bit and restart the connection. Note that downloading pages goes in parallell to this. print 'Something went wrong with writing scenarios. Connection closed? Restarting connections and trying again (',e,')' import traceback traceback.print_stack() #sys.exc_clear() #self.__cc(closed=True) print 5.2 print 6 f = open(self.loglocation+'crawler.log','a') for s in realscenariolist: self.mc.add(md5.new(s['url']).hexdigest(), (s['url'],s['scenarioid'],s['domain'],s['pageindex'])) f.write('(Crawler) Writing scenario: '+str(s['scenarioid'])+' ' + str(s['domain']) + ' ' +str(s['url']) +' '+ str(s['testrunid'])+ '\n') self.scenarioids.add(s['scenarioid']) f.close() print 7 self.writeProcessingState(s['domain'],self.stateurls) except Exception,e: import traceback print 'Exception:',e, traceback.extract_stack() traceback.print_stack() #finally: # self.writeScenarioMutex.release() self.writeScenarioMutex.release() def pushStateURL(self,url): """Adding an URL to the processing state""" self.logger.info('Starting pushing to state...',url) self.stateMutex.acquire() #self.stateurls.append(url) self.stateMutex.release() self.logger.info('Pushing to state...',url) sys.stdout.flush() def popStateURL(self,url): """Removing an URL from the processing state""" self.logger.info('Stopping popping from state...',url) self.stateMutex.acquire() if url in self.stateurls: self.stateurls.remove(url) self.stateMutex.release() self.logger.info('Popping from state...',url) sys.stdout.flush() def getProcessingState(self,domain): """Get the state of the current URLs""" if domain.startswith('http'): domain = urlparse.urlparse(domain)[1] ret = self.mc.get(domain) if not ret: #This means that the Crawler has started a completely new crawl. If a exhaustive scan is suppose to work - even with last-modified, all known URLs must be given to the crawler. #self.__cc() #self.cur.execute('select distinct url from page where domain=%(domain)s;',locals()) #return [urllib2.unquote(i[0]) for i in self.cur.fetchall()] return [] return ret def writeProcessingState(self,domain,urls): """Commit the state of URLs""" self.mc.set(domain,urls) #def storeNDictionary(self): # # #profiler.mark('storendict') # # #pre = time.time() # self.__cc() # domain = '' # alldomains = [] # for URL,n in self.nvalues.items(): # domain = self.cleanupURL(URL,domainonly=True) # alldomains.append({'domain':str(domain),'URL':str(URL),'n':n}) # self.__cc() # if alldomains: # #If N-values have been changed # self.con = psycopg.connect(host=self.host,user=self.username,database=self.database,password=self.password) # self.con.autocommit() # self.cur = self.con.cursor() # try: # self.cur.executemany("insert into page(domain,url) select %(domain)s,%(URL)s where not exists (select True from page where domain=%(domain)s and url=%(URL)s);", alldomains) # # self.con.commit() # except: # sys.stderr.write('Error writing to URL repository ' + str(alldomains) + '\n') # try: # self.cur.executemany("update page set nvalue=%(n)s where url=%(URL)s;",alldomains) # except: # sys.stderr.write('Error updating the URL repository ' + str(alldomains) + '\n') # self.__cc() # self.synchronisesamples('http://'+domain) #def getN(self,URL): # """Get the N-value of an URL # # N is the number of paths from current node (or equal to the highest N for the site if it is unknown) # Keyword arguments: # URL -- URL of the site to get the N for # # Returns N for the URL as integer or None if no N is detected # """ # if not self.nvalues: # self.buildNDictionary(URL) # return self.nvalues.get(URL,None) #def setN(self,URL, n): # """Set the N-value of an URL # Keyword arguments: # URL -- URL of the site to set the N for # n -- The n-value to set # """ # self.__cc() # URL=self.cleanupURL(URL) # if not (type(n) == type(1) or type(n) == type(2324390071)): # raise InvalidNError(n) # if n <= 0: # raise InvalidNError(n) # self.nvalues[URL] = n #Crawler functions def getRandomSite(self): """Get a random site from the url repository Returns the URL of a site randomly selected """ #Enumerate all sites, and return site number modulo number of sites self.__cc() self.cur.execute('select distinct domain from eiaourlrep.Site;') sites=self.cur.fetchall() randsite = random.sample(sites,1)[0][0] while not self.getActivationState(randsite): randsite = random.sample(sites,1)[0][0] return randsite def getAllSites(self): """Get a list of all sites in the URL repository Returns a list of URLs of all the sites from the URL repository """ #Get a list of all sites self.__cc() self.cur.execute('select distinct domain from site;') return [a[0] for a in self.cur.fetchall()] def getAllActiveSitesIterator(self): """An iterator for all active sites in the URL repository """ self.__cc() self.cur.execute('select distinct domain from site where isactive=True;') activesites=[a[0] for a in self.cur.fetchall()] for site in activesites: yield(site) #def getSeedURLsFromSite(self, URL): # """Returns a list of seed URLs from a site # # Keyword arguments: # URL -- URL of site to get seed URL from. # # Returns URL of a seed page # """ # if self.nvalues.keys(): # sam = '' # while not sam or sam.endswith('css'): # sam = random.sample(self.nvalues.keys(),1)[0] # return sam # self.__cc() # domain=self.cleanupURL(URL,domainonly=True) # self.cur.execute('select url from page where domain=%(domain)s and isactive=True order by RANDOM() limit 1;',locals()) # pages = [a[0] for a in self.cur.fetchall() if not a[0].endswith('css')] # if not pages: # #No seed pages registered yet. Use input URL as a seed page. # return URL # else: # return random.sample(pages,1)[0] def getUnavailablecount(self,URL): """Get the number of times the URL has been consecutively unavailable since last time it was available. Keyword arguments: URL -- The URL to get unavailablecount for Returns the number of times the URL has been unavailable since last connection. """ if not self.unavailable: self.getUnavailableURLs(URL) return self.unavailable.get(URL,0) #self.__cc() #URL=self.cleanupURL(URL) #try: # self.cur.execute('select unavailable from page where url=%(URL)s limit 1;',locals()) # result = self.cur.fetchall() #except: # sys.stderr.write('Error getting unavailability for URL:'+str(URL) + '\n') # result = 0 #if not result: # return 0 #else: # return result[0][0] def incrementUnavailablecount(self, URL): """Increment the number of times the URL has been unavailable since last time it was available. Keyword arguments: URL -- The URL to increment unavailablecount for """ #Next lines keeps the complete unavialablecount for a domain self.numunavailable += 1 self.__cc() URL=self.cleanupURL(URL) try: self.cur.execute('Update page set unavailable=(select distinct unavailable+1 from page where url=%(URL)s) where url=%(URL)s;',locals()); self.con.commit() except: sys.exc_clear() import sys self.logger.debug('Error updating unavailablecount') if self.getUnavailablecount(URL)>10: self.deactivateURL(URL) #def resetUnavailablecount(self, URL): ## """Reset the number of times the URL has been consecutively unavailable # # Keyword arguments: # URL -- The URL to reset unavailablecount for # # Returns None # """ # self.__cc() # URL = self.cleanupURL(URL) # try: # self.con = psycopg.connect(host=self.host,user=self.username,database=self.database,password=self.password) # self.con.autocommit() # self.cur = self.con.cursor() # self.cur.execute('update page set unavailable=0 where url=%(URL)s;',locals()) # self.con.commit() # except: # sys.stderr.write('Error resetting unavailability for URL:'+str(URL)+'\n') def getUncheckabilitycount(self,URL,WAMcontainer = None): """Get the number of times the URL has been uncheckable Keyword arguments: URL -- The URL to get uncheckabilitycount for WAMcontainer -- [Optional] The WAM container for uncheckability. For all WAM containers if left empty. Returns the number of times the URL has been uncheckable """ return 0 if (WAMcontainer is not None) and (not type(WAMcontainer)==type(1)): WAMcontainer = self.wamcontainers[WAMcontainer] if not self.uncheckable: self.buildNDictionary(URL) allwamcontainers = [] if not WAMcontainer: allwamcontainers = self.wamcontainers.keys() else: allwamcontainers = [WAMcontainer] return sum([self.uncheckable.get((URL,w),0) for w in allwamcontainers]) def incrementUncheckabilitycount(self, URL,WAMcontainer): """Increment the number of times the URL has been consecutively uncheckable Keyword arguments: URL -- The URL to increment uncheckabilitycount for WAMcontainer -- ID or string of the WAM containers """ if not type(WAMcontainer)==type(1): WAMcontainer = self.wamcontainers[WAMcontainers] #Counts the total number of uncheckable counts within one site domain=self.cleanupURL(URL,domainonly=True) if not self.uncheckable: self.buildNDictionary(URL) URL=self.cleanupURL(URL) self.uncheckable[(URL,WAMContainer)] = (self.uncheckable.get((URL,WAMContainer),(O,False))[0] +1,False) return None def resetUncheckabilitycount(self, URL, WAMContainer): """Reset the number of times the URL has been consecutively uncheckable Keyword arguments: URL -- The URL to reset uncheckabilitycount for WAMcontainer -- ID or string of the WAM containers """ if not type(WAMcontainer)==type(1): WAMcontainer = self.wamcontainers[WAMcontainers] URL = self.cleanupURL(URL) if not self.uncheckable: self.buildNDictionary(URL) self.uncheckable[(URL,WAMContainer)] = (0,True) def getSchedulingError(self,site): """Get the number of times the site has failed after repeated rescheduling. Keyword arguments: site -- The site to get SchedulingError for Returns the number of times the site has failed after repeated rescheduling. """ self.__cc() site=self.cleanupURL(site, True) try: self.cur.execute('select distinct schedulingerror from site where domain=%(site)s;',locals()) #self.cur.execute('select * from site') result = self.cur.fetchall() except: sys.exc_clear() self.logger.debug('Error getting schedulingerror for site:'+str(site) + '\n') result = 0 if not result: return 0 else: return result[0][0] def incrSchedulingError(self, site): """Increment the number of times the site has failed after repeated rescheduling. Keyword arguments: site -- The site to increment SchedulingError for """ self.__cc() self.cur.execute('Update site set schedulingerror=(select distinct schedulingerror+1 from site where domain=%(site)s) where domain=%(site)s;',locals()); self.con.commit() def resetSchedulingError(self, site): """Reset the number of times the site has failed after repeated rescheduling. Keyword arguments: site -- The site to reset SchedulingError for Returns None """ self.__cc() site = self.cleanupURL(site, True) try: self.cur.execute('update site set schedulingerror=0 where domain=%(site)s;',locals()) self.con.commit() except: sys.exc_clear() self.logger.debug('Error resetting schedulingerror for site:'+str(site)+'\n') def cleanupURL(self, URL,domainonly=False): """Simple cleaning and normalisation of URL Keyword arguments: URL -- The URL to be cleaned Returns a normalised URL Examples: >>> urlrep.cleanupURL('www.test.com') 'http://www.test.com/' >>> urlrep.cleanupURL('http://www.test.com') 'http://www.test.com/' >>> urlrep.cleanupURL('www.test.com/test') 'http://www.test.com/test' >>> urlrep.cleanupURL('http://www.test.com/test',domainonly=True) 'www.test.com' """ up=urlparse.urlparse(urllib.unquote(URL)) if not up[0]: URL="http://"+URL up=urlparse.urlparse(URL) if domainonly: URL=up[1] elif not up[2]: up = (up[0],up[1],'/',up[3],up[4],up[5]) URL=urlparse.urlunparse(up) return str(URL[:7]) + str(urllib.quote(URL[7:],safe='/%')) def getAliasForDomain(self,site): """Retrieves aliases for a domain Keyword arguments: site -- Site to get aliases of Returns a list of valid domain names, including original """ domain = self.cleanupURL(site,domainonly=True) alias = self.aliases.get(domain,None) #if not alias: # self.__cc() # self.cur.execute('select distinct alias from domains where domain=%(domain)s;',locals()) # alias = [d[0] for d in self.cur.fetchall()]+[domain] # self.aliases[domain]=alias return alias def synchronisesamplecount(self,site): """Writes samplecount to the URL repository""" domain = self.cleanupURL(site,domainonly=True) self.__cc() try: samplecount = self.numsamples[domain] except: samplecount = 0 try: self.cur.execute('update site set samplecount=%(samplecount)d where domain=%(domain)s;',locals()) self.con.commit() except: sys.exc_clear() self.logger.debug('Error setting samplecount for site:'+site+'\n') def getsamplecount(self,site=None): """Retrieves the number of samples for a given site Keyword arguments: site -- [Optional] Domain as string. Gets current site if left empty Returns number of samples as integer """ if not self.numsamples: #No samples exists return 0 if not site: domain = self.numsamples.keys()[0] else: domain = self.cleanupURL(site, domainonly=True) if domain in self.numsamples.keys(): return self.numsamples[domain] else: self.__cc() self.cur.execute('select samplecount from site where domain=%(domain)s;',locals()) try: return int(self.cur.fetchall()[0][0]) except IndexError: sys.exc_clear() #The samplecount for this site has never been counted before return 0 def addSample(self,site,numofsamples=1): """Incrementing the number of retrieved for one site Keyword arguments: site -- The site this sample belongs to numofsamples -- [Optional] The number of samples you need to increment. 1 as default. """ domain = self.cleanupURL(site, domainonly=True) self.numsamples[domain] = self.numsamples.get(domain,0) + numofsamples #def synchronisesamples(self,site): # """ Writing the number of samples to the DB # # Keyword arguments: # site -- The site these sampels belong to # """ # self.__cc() # domain = self.cleanupURL(site, domainonly=True) # numofsamples = self.numsamples.get(domain,0) # self.con = psycopg.connect(host=self.host,user=self.username,database=self.database,password=self.password) # self.con.autocommit() # self.cur = self.con.cursor() # self.cur.execute('update site set sampleCount=%(numofsamples)s where domain=%(domain)s;',locals()) # self.con.commit() #get current and next stuff moved here from housekeeping def getCurrentRangeLocation(self): """Retrieving the current range location""" return ''.join([self.rdfprefix,'RangeLocation_',str(self.currentRangeLocation)]) def getCurrentPageSurvey(self,retint=False): """Retrieving the current page survey Keyword arguments: retint -- Should return value as Integer """ if retint: return self.currentPageSurvey else: return ''.join([self.rdfprefix,'PageSurvey_',str(self.currentPageSurvey)]) def getCurrentTestRun(self,retint=False): """Retrieving the current testrun""" if retint: return self.currentTestRun else: return ''.join([self.rdfprefix,'TestRun_',str(self.currentTestRun)]) def getCurrentSiteSurvey(self,retint = False): """Retrieving the current site survey Keyword arguments: retint -- Value should be returned as Integer """ if retint: return self.currentSiteSurvey else: return ''.join([self.rdfprefix,'SiteSurvey_',str(self.currentSiteSurvey)]) def getCurrentScenario(self,retint = False): """Retrieving current scenario Keyword arguments: reting -- Value should be returned as Integer """ if retint: return self.currentScenario else: return ''.join([self.rdfprefix,'Scenario_',str(self.currentScenario)]) def getNextTestRun(self): """Retrieving next test run""" self.currentTestRun+=1 return ''.join([self.rdfprefix,'TestRun_',str(self.currentTestRun)]) def getNextSiteSurvey(self): """Retrieving next site survey""" self.currentSiteSurvey+=1 return ''.join([self.rdfprefix,'SiteSurvey_',str(self.currentSiteSurvey)]) def getNextScenario(self): """Retrieving next scenario""" self.currentScenario+=1 return ''.join([self.rdfprefix,'Scenario_',str(self.currentScenario)]) def getNextPageSurvey(self): """Retrieving next page survey""" self.currentPageSurvey+=1 return ''.join([self.rdfprefix,'PageSurvey_',str(self.currentPageSurvey)]) def getNextRangeLocation(self,retint=False): """Retrieving the next range locaiton Keyword arguments: retint -- Should return value as Integer """ self.currentRangeLocation +=1 if retint: return self.currentRangeLocation else: return ''.join([self.rdfprefix,'RangeLocation_',str(self.currentRangeLocation)]) #end of current and next stuff from housekeeping def change_domain(self, olddomain, newdomain): """Attempted fix for #524 """ #Update/change in tables or in in-memory variables, or both? #Try with a simple variant adding an alias self.addSubDomain(olddomain,newdomain) pass if __name__ == "__main__": import sc sc = sc.SystemConfiguration() url = URLRepUtils3(sc) print url.getUnavailablecount('http%3A//www.fingalcoco.ie/Minutes/2004/ff/0225/ff20040183.htm') print url.getUnavailablecount('http%3A//www.fingalcoco.ie/Minutes/2004/ff/0225/ff20040183_tull.htm') #import sc #sco = sc.SystemConfiguration() #if raw_input('This will the delete all content of the URL repository and instert the basic URLs. Are you sure you want to do that (y/n). No as default.').lower() in ('y','yes'): # ur=URLRepUtils3(sco,deletemodel=True) # import os # files = ['../GapGemeni/'+l for l in os.listdir('../GapGemeni/') if l.endswith('2007.csv')] # random.shuffle(files) # for f in files: # print 'Adding sites from',f # ur.addInitialURLs(f) #else: # print 'Script not run' #ur.addInitialURLs('../GapGemeni/Norway.csv') #ur.addInitialURLs('NK_csv/NACE/84.11/Government_relatert_17.01.07.csv') #ur.addInitialURLs('NK_csv/NACE/64.11/Nace_64_11_Eurozone.csv') #ur.addInitialURLs('NK_csv/NACE/unsorted/Government_myndigheter_denmark_Unknown.csv') #ur.addInitialURLs('NK_csv/NACE/47.91/47_91_relatert_11.01.07.csv') #ur.addInitialURLs('NK_csv/NACE/84.11/Government_myndigheter_denmark_2.csv') #ur.addInitialURLs('NK_csv/NACE/84.11/Government_myndigheter_sweden.csv') #ur.addInitialURLs('NK_csv/NACE/84.11/Government_myndigheter_norway.csv') #ur.addInitialURLs('DMOZ_extraction_v1.csv') #ur.addInitialURLs('NK_csv/NACE/84.11/Government_myndigheter_norway.csv') #ur.addInitialURLs('Government_myndigheter_norway.csv')