#!/usr/bin/python """ Module for ETL server """ # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __author__ = "Morten Goodwin Olsen" __maintainer__ = 'Nils Ulltveit-Moe' __version__ = "$Id$" from eiaotime import * import Queue import SOAPpy import threading import os import sys import random import time import urlparse import socket from subprocess import * from etlservererror import * from sc import SystemConfiguration sco=SystemConfiguration() PIDFILE="/var/run/etlserver.pid" class Log: """Ensure auto-flush after each write""" def __init__(self, f): self.f = f def write(self, s): self.f.write(s) self.f.flush() def flush(self): self.f.flush() log=Log(open(sco.loglocation+'/etlserver.log','a')) class StoppingServer(threading.Thread): """Special thread for finalising the ETL """ def __init__(self,server): threading.Thread.__init__(self) self.dataqueue=Queue.Queue() self.server=server def loadETL(self): """ Start finalisation when ready """ self.dataqueue.put('Stop') def run(self): element = self.dataqueue.get() self.server.stop() #Must make sure all queues are empty before finalising. (No ETL is actually running) while (sum([1 for i in self.server.etlt if i.dataqueue.empty()])!=self.server.numthreads) or (self.server.getLoading()): time.sleep(random.randint(0,10)) #While some theads still are active log.write('Waiting for all threads to be finished before finalising\n') #The Datawarehouse files should be located somewhere more dynamic if 'nowrite' is sys.argv: cmd = 'ls *.py' os.system('sleep 20') else: #cmd = 'cd ../Datawarehouse/;psql -f FillEIAODW_R20.sql eiaodwr10_tg3 eiaodw' cmd = 'ls *.py' os.system(cmd) #cmd = 'cd ../Datawarehouse/;psql -f StoredProceduresR20.sql eiaodwr10_tg3 eiaodw' os.system(cmd) #cmd = 'cd ../Datawarehouse/;psql -f FillEIAODW_R20.sql eiaodwr10_tg3-rf eiaodw' os.system(cmd) #cmd = 'cd ../Datawarehouse/;psql -f StoredProceduresR20.sql eiaodwr10_tg3-rf eiaodw' os.system(cmd) log.write("Testrun finalised. Ready to handle requests again\n") self.server.start() class ETLServerT(threading.Thread): """Threaded ETL server class """ def __init__(self): """Constructor for the ETL Server Thread """ threading.Thread.__init__(self) self.dataqueue=Queue.Queue() self.starttime = None self.pid = None self.database = None def duration(self): """Returns the duration of an ETL """ return time.time()-self.starttime def stopifhanged(self): """Claims this tread is stopped even if it has not. This is to prevent bugs due to too long duration """ if self.starttime and self.pid: if self.duration()>10000: cmd = 'kill -9 ' + str(self.pid) os.system(cmd) sys.stderr.write('Killed pid:'+str(self.pid)) if self.database: sys.stderr.write(' Database: '+str(self.database)) sys.stderr.write('Investigate why \n') def loadETL(self,database,testrun, freqmonproblems,host): """ Schedules data for loading in the DW Keyword arguements: database -- Database testrun -- Testrun freqmonproblem -- A list of strings describing any frequency monitoring problems host -- Host to connect to. Localhost as default. Returns None""" print 'Pushing:',self # # # If items in list, log each and write the site to to the redflag-dw instead of the normal dw # # #dw = "egovmondw" #redflagdw = "egovmondw" dw = sco.dwdatabase #if freqmonproblems: self.dataqueue.put((database,testrun,dw,host)) #else: # self.dataqueue.put((database,testrun, dw,host)) def stop(self): """ Scheduled this thread for stopping Returns None """ self.dataqueue.put(None) def run(self): """ Running the actual thread. Loops until stop(...) is called Returns None """ #self.url = ''.join(['http://',self.host,':',str(self.port),'/']) while 1: element = self.dataqueue.get() if not element: #print 'Server is stopping' return else: database,testrun,dwdatabase,host = (element) log.write(' '.join(('Loading ',str(database), str(dwdatabase), str(host),'\n'))) if 'nowrite' is sys.argv: cmd = 'sleep 20' else: #cmd = './runetl.sh %s %s' %(model,database) log.write('Loading %s into %s\n'%(database,dwdatabase)) cmd = 'dw20load -u eiaodw -d %s -p eiaodw -m %s -r %s -o %s' %(dwdatabase, "model",database, host) #cmd = 'python dw10load.py -d eiaodwr10 -u eiaodw -p eiaodw -r %s -m %s -e %s >%s.log' % (database,"model", self.url,database) self.database = database self.fout = open('/var/log/eiao/etl_'+database+'_stdout.log','w') self.ferr = open('/var/log/eiao/etl_'+database+'_stderr.log','w') self.p = Popen(cmd,shell=True,stdout=self.fout,stderr=self.ferr) self.pid = self.p.pid self.starttime = time.time() self.p.wait() #os.system(cmd) self.fout.close() self.ferr.close() log.write('Finish loading ' + str(database) + '\n') class ETLServer: """Site URL server class """ def __init__(self,port=8890,host=None,numthreads=10): """initialise the class KeyWord arguments: port -- [Optional] Port to serve at. 8890 as default. host -- [Optional] Host to server at. Localhost as default. numthreads -- [Optional] Number of threads. """ self.port = port self.host = host self.numthreads = numthreads self.etlt = [] self.start() self.sc = SystemConfiguration() def start(self): """ Staring all threads Needed to start or restart all threads in the server Returns None """ #Just to make sure everything is stopped. The next command is strictly not needed it the ETL server is used correctly. However, it is no harm in calling it too much. self.stop() self.etlt = [] for i in range(self.numthreads): t = ETLServerT() t.start() t.port = self.port t.host = self.host self.etlt.append(t) self.loading = False self.loadingqueue = {} log.write(' '.join(('ETL Server is avilable at port',str(self.port),'running with',str(self.numthreads),'threads','\n'))) def testRun(self,numberofsites,testrunnr): """ Register a new testrun Register a new testrun with the number of sites connected to this testrun Keyword arguments: numberofsites -- Number of sites connected to the testrun testrunnr -- The testrunnr """ log.write('Adding testrun:' + str(testrunnr)+'\n') if not self.loadingqueue.has_key(testrunnr): self.loadingqueue[testrunnr] = (numberofsites,0,0) return True elif self.loadingqueue.get(testrunnr)[0]==0: return False else: prev = self.loadingqueue.get(testrunnr) newset = (numberofsites+prev[0],prev[1],prev[2]) self.loadingqueue[testrunnr] = newset return True def incrementSiteNr(self,testrunnr): """Increment the site number For incrementing the number of sites in a testrun Should be used if there already exists a testrun and there is a need to increase the number of registered sites. Rewrite of testRun(...)-function. This function is needed for the URL server to work correctly. Keyword Arguments: testrunnr -- TestRun nr """ numsites,correct,failed = self.loadingqueue[testrunnr] self.loadingqueue[testrunnr] = (numsites+1,correct,failed) def siteFailed(self,testrunnr): """Report a failing site For incrementing the number of failing sites in a testrun Should be called when crawling a site has failed and it should not be loaded into the DW Keyword Arguments: testrunnr -- TestRun nr """ numsites,correct,failed = self.loadingqueue[testrunnr] self.loadingqueue[testrunnr] = (numsites,correct,failed+1) self.finaliseIfNeeded(testrunnr) def removeTestRun(self,testrunnr): """Removes a testrun from the queue Forces a testrun to be removed Keyword arguments: testrunnr -- TestRun nr """ #TODO: Find out what the ... it this function good for, and if it is actually needed for something useful? try: self.loadingqueue.pop(testrunnr) except KeyError: pass #Fix to workaound ambiguaty in the URL repository def finaliseIfNeeded(self,testrunnr): """Function to investigate if a testrun should be finalised Keyword arguments: testrunnr -- Current testrun """ numsites,correct,failed = self.loadingqueue[testrunnr] for t in self.etlt: t.stopifhanged() if numsites < correct+failed: #The following exception is not raise, only written to sterr. This is becaus we do not want to let the ETL server fail in case of malfunctional reports of URLs. It is however important that the error is raised. err = TooManySitesError(testrunnr,numsites,correct,failed) sys.stderr.write(str(err)) if numsites <= correct+failed: t = StoppingServer(self) t.start() t.loadETL() #import pdb #self.stop() #time.sleep(10) #Must make sure all queues are empty before finalising. #while sum([1 for i in self.etlt if not i.isAlive()])!=self.numthreads and not self.getLoading(): # #While some theads still are active # print 'Waitnig for all threads to be finished before finalising' # print [i.isAlive() for i in self.etlt] # time.sleep(10) #print 'Finalising...' #self.start() #cmd = 'psql -f FillEIAODW_R10.sql eiaodwr10 eiaodw' #os.system(cmd) #print '\n\n\n\n\n\n\nStopping\n\n\n\n\n\n' #cmd = 'python dw10load.py -d eiaodwr10 -u eiaodw -p eiaodw -m %s --finalize' % (testrunnr) else: log.write(' '.join(('Not finalising yet. Only ',str(correct+failed),'of',str(numsites), str(((correct+failed)/float(numsites))*100),'% finished.\n'))) def stop(self): """Stops the server and all its connected threads """ for t in self.etlt: if t.isAlive(): t.stop() def setLoading(self,isloading): """Function for setting that the ETL is loading. This function is needed for the ETL to know if another ETL is currently loading Keyword arguments: isloading -- If the ETL is loading or not as Boolean Returns None """ self.loading = isloading def getLoading(self): """ Function for returning if one ETL is loading This function is needed fo rthe ETL to know if another ETL is currently loading Returns Boolean """ return self.loading def loadETL(self,database,testrunnr, freqmonproblems=False,host=None): """ Pushes data to the ETL for loading into to DW Keyword arguments: database -- Database to load to testrunnr -- Current Testrunnr Returns None """ if not host: host = self.host try: numsites,correct,failed = self.loadingqueue[testrunnr] except: self.testRun(10000,testrunnr) numsites,correct,failed = self.loadingqueue[testrunnr] self.loadingqueue[testrunnr] = (numsites,correct+1,failed) f = open(self.sc.loglocation + 'etlserver.log','a') f.write('Number of sites left in the etl queue: '+ str(len(self.loadingqueue[testrunnr]))) f.close() log.write(' '.join(['Recieved data',str(database),str(testrunnr),str(self.loadingqueue[testrunnr]),'\n'])) f = open(self.sc.loglocation + 'crawler.log','a') f.write('Recieved data in the ETL: '+str(database)+' '+str(testrunnr)+ ' ' + str(freqmonproblems)+' '+str(host)+'\n') f.close() anythread = random.sample(self.etlt,1)[0] anythread.loadETL(database,testrunnr, freqmonproblems,host) self.finaliseIfNeeded(testrunnr) print 'Number of sites left to load:',sum([i.dataqueue.qsize() for i in self.etlt]) def printHelp(): """For printing help This is needed as input to any user starting the ETL server. Exits all active modules. Returns None """ print 'Usage:\npython etlserver.py [OPTIONS]' print 'Options:' print '-p N - port to use. 8890 as default.' print '-t N - Number of ETL server threads. 10 as default.' print 'nowrite - Do not load or write anything to the DW. Usefule for testing only.' print '--help - This help file' sys.exit(1) def main(port,numthreads): sc = SystemConfiguration() host,port = urlparse.urlsplit(sc.etlserver)[1].split(':') s = ETLServer(host=host,port=int(port),numthreads=numthreads) tries = 1 error = True while tries<5 and error: #If address is taken, try several times. The reason is most often that restart has happened... try: server = SOAPpy.SOAPServer((host, int(port))) except socket.error: time.sleep(random.randint(1,30)*tries) tries += 1 else: error = False if tries==5: #If max tries, this will throw an exeption server = SOAPpy.SOAPServer((host, int(port))) server.registerFunction(s.loadETL) server.registerFunction(s.getLoading) server.registerFunction(s.setLoading) server.registerFunction(s.stop) server.registerFunction(s.testRun) server.registerFunction(s.siteFailed) server.registerFunction(s.removeTestRun) server.registerFunction(s.incrementSiteNr) try: server.serve_forever() except KeyboardInterrupt: s.stop() def daemonize(): # Disconnect from controlling TTY as a service try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Do not prevent unmounting... os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent, print eventual PID before #print "Daemon PID %d" % pid open(PIDFILE,'w').write("%d"%pid) sys.exit(0) except OSError, e: print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Redirect stdout/stderr to log file sys.stdout=sys.stderr=log # UID and GID Nobody os.setegid(99) os.seteuid(99) if __name__ == '__main__': if '--help' in sys.argv: printHelp() try: if '-p' in sys.argv: port = sys.argv[sys.argv.index('-p')+1] else: port = '8890' if '-t' in sys.argv: numthreads = sys.argv[sys.argv.index('-t')+1] else: numthreads = 1 if '-d' in sys.argv: daemonise=True else: daemonise=False except IndexError: printHelp() if daemonise: daemonize() main(port,numthreads)