#! /usr/bin/env python """ Module for Sampling server """ # Copyright 2005, 2006 EIAO Consoritum # This program is distributed under the terms of the GNU General # Public License. # # This file is part of the European Internet Accessibility Observatory # (EIAO) # # EIAO is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # EIAO is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with EIAO; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, # MA 02110-1301 USA __author__ = "Morten Goodwin Olsen" __maintainer__ = 'Nils Ulltveit-Moe' __version__ = "$Id$" from eiaotime import * import Queue import SOAPpy import threading import os import sys import random import time import urlparse from socket import gethostname; import socket from subprocess import * from threading import Semaphore from sc import SystemConfiguration sco=SystemConfiguration() PIDFILE="/var/run/samplingserver.pid" class Log: """Ensure auto-flush after each write""" def __init__(self, f): self.f = f def write(self, s): self.f.write(s) self.f.flush() def flush(self): self.f.flush() log=Log(open(sco.loglocation+'/samplingserver.log','a')) class SamplingServerT(threading.Thread): """Threaded ETL server class """ def __init__(self,sc,parent): """Constructor for the ETL Server Thread """ threading.Thread.__init__(self) self.dataqueue=Queue.Queue() self.starttime = None self.pid = None self.database = None self.sc = sc self.parent = parent def duration(self): """Returns the duration of an ETL """ return time.time()-self.starttime def stopifhanged(self): """Claims this tread is stopped even if it has not. This is to prevent bugs due to too long duration """ if self.starttime and self.pid: if self.duration()>10000: cmd = 'kill -9 ' + str(self.pid) os.system(cmd) sys.stderr.write('Killed pid:'+str(self.pid)) if self.database: sys.stderr.write(' Database: '+str(self.database)) sys.stderr.write('Investigate why \n') def loadSample(self,location,site,database,testrun,pid,host,etlserver):#database,testrun, freqmonproblems,host): """ Schedules data for loading in the DW Keyword arguements: database -- Database testrun -- Testrun freqmonproblem -- A list of strings describing any frequency monitoring problems host -- Host to connect to. Localhost as default. Returns None""" self.dataqueue.put((location,site,database,testrun,pid,host,etlserver)) def stop(self): """ Scheduled this thread for stopping Returns None """ self.dataqueue.put(None) def run(self): """ Running the actual thread. Loops until stop(...) is called Returns None """ while 1: element = self.dataqueue.get() if not element: #print 'Server is stopping' return else: location,site,database,testrun,pid,host,etlserver = (element) print 'Loading', location,site,database,testrun,pid,host,etlserver wamserver = self.parent.getNextWamServer() f = open(self.sc.loglocation + 'crawler.log','a') f.write('Starting sampling of :' + str(site) + ' ' + str(database) + ' ' + str(testrun) + ' ' + str(pid) + ' Log available with sampler_'+str(database)+'_stdout.log sampler_'+str(database)+'_stderr.log\n') f.close() os.system('sampler ' + str(site) + ' ' + str(database) + ' ' + str(testrun) + ' ' + str(pid) + ' ' + str(etlserver)+' '+ str(wamserver)+' html1>'+self.sc.loglocation+'sampler_'+str(database)+'_stdout.log 2>'+self.sc.loglocation+'sampler_'+str(database)+'_stderr.log') class SamplingServer: """Site URL server class """ def __init__(self,port=8891,host=None,numthreads=1): """initialise the class KeyWord arguments: port -- [Optional] Port to serve at. 8891 as default. host -- [Optional] Host to server at. Localhost as default. numthreads -- [Optional] Number of threads. """ self.port = port self.host = host self.numthreads = numthreads self.samplingt = [] self.sc = SystemConfiguration() self.start() self.wamserversMutex=Semaphore(1) self.wamservers = self.sc.wamservers.split(',') self.wamserversnumber = 0 def getNextWamServer(self): self.wamserversMutex.acquire() self.wamserversnumber += 1 self.wamservers[self.wamserversnumber%len(self.wamservers)] self.wamserversMutex.release() def stop(self): """ Stopping all threads """ for t in self.samplingt: t.stop() def start(self): """ Staring all threads Needed to start or restart all threads in the server Returns None """ #Just to make sure everything is stopped. The next command is strictly not needed it the ETL server is used correctly. However, it is no harm in calling it too much. self.stop() self.samplingt = [] for i in range(self.numthreads): t = SamplingServerT(self.sc,self) t.start() t.port = self.port t.host = self.host self.samplingt.append(t) self.loading = False self.loadingqueue = {} print 'Sampling Server is avilable at port',self.port,'running with',self.numthreads,'threads' def loadSample(self,location,site,database,testrun,pid,host,etlserver): """ Pushes data to the ETL for loading into to DW Keyword arguments: database -- Database to load to testrunnr -- Current Testrunnr Returns None """ if not host: host = self.host #sampler '+str(location) + ' ' + str(self.site) + ' ' + str(self.database) + ' ' + str(self.testrun) + ' ' + str(self.pid) + ' 1>/var/log/eiao/sampler_'+str(self.database)+'_stdout.log 2>/var/log/eiao/sampler_'+str(self.database)+'_stderr.log' print 'Recieved data',location,site,database,testrun,pid,host,etlserver print 'Number of sites left to sample:',sum([i.dataqueue.qsize() for i in self.samplingt]) f = open(self.sc.loglocation + 'crawler.log','a') f.write('Recieved data in the Sampler:'+str(location) + ' ' + str(site) + ' ' + str(database) + ' ' + str(testrun) + ' ' + str(pid) + '\n') f.close() anythread = random.sample(self.samplingt,1)[0] anythread.loadSample(location,site,database,testrun,pid,host,etlserver) def printHelp(): """For printing help This is needed as input to any user starting the ETL server. Exits all active modules. Returns None """ print 'Usage:\npython samplingserver [OPTIONS]' print 'Options:' print '-p N - port to use. 8891 as default.' print '-t N - Number of sampling server server threads. 1 as default.' print '--help - This help file' sys.exit(1) def main(port,numthreads): sc = SystemConfiguration() host,port = urlparse.urlsplit(sc.samplingserver)[1].split(':') s = SamplingServer(host=host,port=int(port),numthreads=numthreads) tries = 1 error = True while tries<5 and error: #If address is taken, try several times. The reason is most often that restart has happened... try: server = SOAPpy.SOAPServer((host, int(port))) except socket.error: time.sleep(random.randint(1,30)*tries) tries += 1 else: error = False if tries==5: #If max tries, this will throw an exeption server = SOAPpy.SOAPServer((host, int(port))) server.registerFunction(s.loadSample) try: server.serve_forever() except KeyboardInterrupt: s.stop() def daemonize(): # Disconnect from controlling TTY as a service try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Do not prevent unmounting... os.chdir("/") os.setsid() os.umask(0) # do second fork try: pid = os.fork() if pid > 0: # exit from second parent, print eventual PID before #print "Daemon PID %d" % pid open(PIDFILE,'w').write("%d"%pid) sys.exit(0) except OSError, e: print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror) sys.exit(1) # Redirect stdout/stderr to log file sys.stdout=sys.stderr=log # UID and GID Nobody os.setegid(99) os.seteuid(99) if __name__ == '__main__': if '--help' in sys.argv: printHelp() try: if '-p' in sys.argv: port = sys.argv[sys.argv.index('-p')+1] else: port = '8891' if '-t' in sys.argv: numthreads = sys.argv[sys.argv.index('-t')+1] else: numthreads = 20 if '-d' in sys.argv: daemonise=True else: daemonise=False except IndexError: printHelp() if daemonise: daemonize() main(port,numthreads)