#! /usr/bin/env python """ Run script for the crawler """ __author__ = "Anand B Pillai" __maintainer__ = "Anand B Pillai,Morten Goodwin Olsen" __version__ = '$Id$' import sys import os import threading from eiaotime import * import time import random import getopt from sc import * from subprocess import * import MySQLdb import crawlererror import SOAPpy import signal global sc import socket sc = SystemConfiguration() import urlparse hostname = socket.gethostname() maxmemory = int(sc.maxmemory) siteurlserver = SOAPpy.SOAPProxy(urllib.unquote(sc.siteurlserver)) #siteurlserver.notAlive('eiao2.eiao.net') # Max duration of crawl (a large number for exhaustive scan.) MAXDURATION = 1800000 PIDFILE="/var/run/crawlers.pid" daemonise=False debug=False numberParallelCrawlers=25#50 activethreads = [] class Log: """Ensure auto-flush after each write""" def __init__(self, f): self.f = f def write(self, s): self.f.write(s) self.f.flush() def flush(self): self.f.flush() log=Log(open(sc.loglocation+'/crawlers.log','a')) def killCrawlers(signum, frame): """Kill all crawlerwrapper processes""" for thread in activethreads: print "Killing",thread.pid os.kill(thread.pid,signal.SIGTERM) # Kill myself. (How is this done in a nicer way without hanging?) os.kill(os.getpid(),signal.SIGKILL) signal.signal(signal.SIGTERM,killCrawlers) class RunCrawlerT(threading.Thread): """Thread for running the crawling""" def __init__(self): self.finish = False self.pid = None self.fout = None self.dbname = '' self.ferr = None self.starttime=0 threading.Thread.__init__(self) self.starttime = time.time() self.server = SOAPpy.SOAPProxy(urllib.unquote(sc.siteurlserver)) self.maxduration = MAXDURATION # Long enough duration for the exhaustive scans def duration(self): """Returns the duration of this crawl""" return time.time()-self.starttime def memusage(self): """Returns the memory usage of the process in MB. If process has not running (not yet started or already finished), it returns 0""" if not self.pid: return 0 try: return int(open('/proc/'+str(self.pid)+'/statm','r').read().split()[0])*4/1024 except IOError: return 0 def run(self): print 'Starting crawler process' dbname = self.server.getdbname() self.dbname = dbname outname = '/var/log/eiao/crawler_%s_stdout.log'%(dbname) self.fout = open(outname,'w') errorname = '/var/log/eiao/crawler_%s_stderr.log'%(dbname) self.ferr = open(errorname,'w') print 'Running crawler to database %s. Stdout from this crawler is:crawler_%s_stdout.log. Stderr from this crawler is:crawler_%s_stderr.log' %(dbname,dbname,dbname) #The database is not needed before the sampler. No point in creating it here. #create_dynamicdb(dbname) if 'debug' in sys.argv: cmd = 'nice -19 debugcrawler -d %s' % (dbname) else: cmd = 'nice -19 crawlerwrapper -d %s' % (dbname) smalllogs = False try: smalllogs = int(sc.smalllogs) except: pass if smalllogs: self.p = Popen(cmd) else: self.p = Popen(cmd,shell=True,stdout=self.fout,stderr=self.ferr) self.pid = self.p.pid try: exitstatus = self.p.wait() except OSError, E: #Occasionally, this throws an OSError, Unknown error 512 #This is a workaround for bug #397 #Setting starttime to 0 forces this process and thread to be killed. self.starttime = 0 exitstatus = 512 time.sleep(random.randint(1,60)) sys.stderr.write(str(E)+'\n') self.finish = True self.fout.close() self.ferr.close() print 'Exit status for process',self.pid,'with db',self.dbname,'is',exitstatus if exitstatus!=0: sys.stderr.write(' '.join(['Error: Process PID',str(self.pid),'(',str(self.dbname),') exited with status',str(exitstatus),'\n'])) try: if self.p: del(self.p) except: pass self.finish = True def run_crawler(parallell,debug): """Running the actual crawler Keyword arguments: parallell -- Number of processes to run in parallel debug -- debug flag """ # Crawlers are always available. while 1: print 'Actually starting to crawl...' #while (len([a for a in activethreads if a.finish==False])>=parallell) and (min([a.duration() for a in activethreads])=parallell: try: time.sleep(random.randint(1,10)) except: pass print 'Waiting for threads to be finsihed...' print 'Number of active threads:' + str(len([a for a in activethreads if a.finish==False])) print 'Number of finished threads:' + str(len([a for a in activethreads if a.finish==True])) # The next lines are to avoid crawlers that hang. # This is the Watchdog kill functionality. # It is not active in debug mode. if not debug: #global siteurlserver try: if not random.randint(0,59): sitesnotalive = siteurlserver.notAlive(hostname) else: sitesnotalive = [] except: open('/var/log/eiao/crawler.log','a').write('Error getting notalive') sitesnotalive = [] #Error with connecting to the siteurlserver. If error with the connection, assume that no crawlers have hanged. These crawlers will still hang next time this is asked. for crawlerpid in sitesnotalive: try: os.kill(crawlerpid,9) except OSError: pass open('/var/log/eiao/crawler.log','a').write(' '.join(['Process',str(crawlerpid),' hung and is killed since keepAlive not recieved for this pid.\n'])) print ' '.join(['Process',str(crawlerpid),' hung and is killed since keepAlive not recieved for this pid.\n']) for crawlerthread in activethreads: if crawlerthread.memusage()>maxmemory: try: siteurlserver.addExceedMemory(crawlerthread.pid) #os.kill(crawlerthread.pid,9) except OSError: pass print 'Memory usage for PID',crawlerthread.pid,'is',crawlerthread.memusage() open('/var/log/eiao/crawler.log','a').write(' '.join(['Process',str(crawlerthread.pid),' used too much memory and is killed.\n'])) print ' '.join(['Process',str(crawlerthread.pid),' used too much memory and is killed.\n']) rct = RunCrawlerT() rct.start() activethreads.append(rct) try: time.sleep(random.randint(0,0.1)) except: pass def create_dynamicdb(dbname): """Creating dynamic databases Keyword arguments: dbname -- Name of database returns None """ # Connect to MySQL as root and create new RDF database connection = MySQLdb.connect(host='localhost', user='root') c=connection.cursor() # Creating dynamic DB with name',dbnamea try: c.execute("CREATE DATABASE "+dbname) except ProgrammingError, e: # Error: RDF Database already exists. print e connection.close() raise crawlererror.RDFDBAlreadyExistsError(dbname) # Granting privileges to localhost' c.execute("""GRANT ALL PRIVILEGES ON %s.* TO '%s'@localhost IDENTIFIED BY '%s'""" % (dbname, sc.dbusername, sc.dbpassword)) # Granting privileges to etlserver etlserver=urlparse.urlsplit(sc.etlserver)[1].split(':')[0] c.execute("""GRANT ALL PRIVILEGES ON %s.* TO '%s'@%s IDENTIFIED BY '%s'""" % (dbname, sc.dbusername, etlserver, sc.dbpassword)) # Embedded tables.sql here, to avoid another external file reference. c.execute("USE "+dbname) c.execute(""" CREATE TABLE RAWRDF( rdf longblob ) TYPE=MyISAM;""") #c.execute("""CREATE TABLE models ( #hash bigint(20) NOT NULL default '0', #model text NOT NULL, #PRIMARY KEY (hash) #) TYPE=MyISAM;""") #c.execute("""CREATE TABLE literals ( #hash bigint(20) NOT NULL default '0', #literal text NOT NULL, #PRIMARY KEY (hash) #) TYPE=MyISAM;""") #c.execute("""CREATE TABLE resources ( #hash bigint(20) NOT NULL default '0', #uri varchar(512) NOT NULL default '', #PRIMARY KEY (hash), #KEY (uri) #) TYPE=MyISAM;""") #c.execute("""CREATE TABLE `triples` ( #` model` bigint(20) NOT NULL default '0', #` subject` bigint(20) NOT NULL default '0', #`predicate` bigint(20) NOT NULL default '0', #`object` bigint(20) NOT NULL default '0', #`literal` tinyint(1) NOT NULL default '0', #`inferred` tinyint(1) NOT NULL default '0', #KEY `s` (`subject`), #KEY `p` (`predicate`), #KEY `o` (`object`), #KEY `m` (`model`) #) TYPE=MyISAM;""") #c.execute("""CREATE TABLE `cache_state` ( #`taxonomy` int(11) NOT NULL default '0', #`sia` int(11) NOT NULL default '0' #) TYPE=MyISAM;""") #c.execute("""INSERT INTO cache_state VALUES();""") def daemonize(): # Disconnect from controlling TTY as a service try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Do not prevent unmounting... os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent, print eventual PID before #print "Daemon PID %d" % pid open(PIDFILE,'w').write("%d"%pid) sys.exit(0) except OSError, e: print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Redirect stdout/stderr to log file sys.stdout=sys.stderr=log # UID and GID Nobody os.setegid(99) os.seteuid(99) def usage(): return """Usage: %s [-n NPC | --number=NPC]][-d | --daemonise][-h|--help][--debug] Where NPC is the number of parallel crawlers (default 10)""" % sys.argv[0] if __name__ == "__main__": (opts,args)=getopt.getopt(sys.argv[1:],"dn:",["daemonise","number=","debug","help"]) for opt,arg in opts: if opt in ("-d","--daemonise"): daemonise=True elif opt in ("-h","--help"): sys.exit(usage()) elif opt in ("-n","--number"): numberParallelCrawlers=arg elif opt in ("--debug"): debug=True if daemonise: daemonize() run_crawler(numberParallelCrawlers,debug)