Ticket #315: httpserver.py

File httpserver.py, 56.6 KB (added by jkp, 3 years ago)

Revised SSL Implementation

Line 
1# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
2# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
3# (c) 2005 Clark C. Evans
4# This module is part of the Python Paste Project and is released under
5# the MIT License: http://www.opensource.org/licenses/mit-license.php
6# This code was written with funding by http://prometheusresearch.com
7"""
8WSGI HTTP Server
9
10This is a minimalistic WSGI server using Python's built-in BaseHTTPServer;
11if pyOpenSSL is installed, it also provides SSL capabilities.
12"""
13
14# @@: add in protection against HTTP/1.0 clients who claim to
15#     be 1.1 but do not send a Content-Length
16
17# @@: add support for chunked encoding, this is not a 1.1 server
18#     till this is completed.
19
20import atexit
21import traceback
22import socket, sys, threading, urlparse, Queue, urllib
23import posixpath
24import time
25import thread
26import os
27from itertools import count
28from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
29from SocketServer import ThreadingMixIn
30from paste.util import converters
31import logging
32try:
33    from paste.util import killthread
34except ImportError:
35    # Not available, probably no ctypes
36    killthread = None
37
38__all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve']
39__version__ = "0.5"
40
41class ContinueHook(object):
42    """
43    When a client request includes a 'Expect: 100-continue' header, then
44    it is the responsibility of the server to send 100 Continue when it
45    is ready for the content body.  This allows authentication, access
46    levels, and other exceptions to be detected *before* bandwith is
47    spent on the request body.
48
49    This is a rfile wrapper that implements this functionality by
50    sending 100 Continue to the client immediately after the user
51    requests the content via a read() operation on the rfile stream.
52    After this response is sent, it becomes a pass-through object.
53    """
54
55    def __init__(self, rfile, write):
56        self._ContinueFile_rfile = rfile
57        self._ContinueFile_write = write
58        for attr in ('close', 'closed', 'fileno', 'flush',
59                     'mode', 'bufsize', 'softspace'):
60            if hasattr(rfile, attr):
61                setattr(self, attr, getattr(rfile, attr))
62        for attr in ('read', 'readline', 'readlines'):
63            if hasattr(rfile, attr):
64                setattr(self, attr, getattr(self, '_ContinueFile_' + attr))
65
66    def _ContinueFile_send(self):
67        self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n")
68        rfile = self._ContinueFile_rfile
69        for attr in ('read', 'readline', 'readlines'):
70            if hasattr(rfile, attr):
71                setattr(self, attr, getattr(rfile, attr))
72
73    def _ContinueFile_read(self, size=-1):
74        self._ContinueFile_send()
75        return self._ContinueFile_rfile.readline(size)
76
77    def _ContinueFile_readline(self, size=-1):
78        self._ContinueFile_send()
79        return self._ContinueFile_rfile.readline(size)
80
81    def _ContinueFile_readlines(self, sizehint=0):
82        self._ContinueFile_send()
83        return self._ContinueFile_rfile.readlines(sizehint)
84
85class WSGIHandlerMixin:
86    """
87    WSGI mix-in for HTTPRequestHandler
88
89    This class is a mix-in to provide WSGI functionality to any
90    HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer).
91    This assumes a ``wsgi_application`` handler on ``self.server``.
92    """
93    lookup_addresses = True
94
95    def log_request(self, *args, **kwargs):
96        """ disable success request logging
97
98        Logging transactions should not be part of a WSGI server,
99        if you want logging; look at paste.translogger
100        """
101        pass
102
103    def log_message(self, *args, **kwargs):
104        """ disable error message logging
105
106        Logging transactions should not be part of a WSGI server,
107        if you want logging; look at paste.translogger
108        """
109        pass
110
111    def version_string(self):
112        """ behavior that BaseHTTPServer should have had """
113        if not self.sys_version:
114            return self.server_version
115        else:
116            return self.server_version + ' ' + self.sys_version
117
118    def wsgi_write_chunk(self, chunk):
119        """
120        Write a chunk of the output stream; send headers if they
121        have not already been sent.
122        """
123        if not self.wsgi_headers_sent and not self.wsgi_curr_headers:
124            raise RuntimeError(
125                "Content returned before start_response called")
126        if not self.wsgi_headers_sent:
127            self.wsgi_headers_sent = True
128            (status, headers) = self.wsgi_curr_headers
129            code, message = status.split(" ", 1)
130            self.send_response(int(code), message)
131            #
132            # HTTP/1.1 compliance; either send Content-Length or
133            # signal that the connection is being closed.
134            #
135            send_close = True
136            for (k, v) in  headers:
137                lk = k.lower()
138                if 'content-length' == lk:
139                    send_close = False
140                if 'connection' == lk:
141                    if 'close' == v.lower():
142                        self.close_connection = 1
143                        send_close = False
144                self.send_header(k, v)
145            if send_close:
146                self.close_connection = 1
147                self.send_header('Connection', 'close')
148
149            self.end_headers()
150        self.wfile.write(chunk)
151
152    def wsgi_start_response(self, status, response_headers, exc_info=None):
153        if exc_info:
154            try:
155                if self.wsgi_headers_sent:
156                    raise exc_info[0], exc_info[1], exc_info[2]
157                else:
158                    # In this case, we're going to assume that the
159                    # higher-level code is currently handling the
160                    # issue and returning a resonable response.
161                    # self.log_error(repr(exc_info))
162                    pass
163            finally:
164                exc_info = None
165        elif self.wsgi_curr_headers:
166            assert 0, "Attempt to set headers a second time w/o an exc_info"
167        self.wsgi_curr_headers = (status, response_headers)
168        return self.wsgi_write_chunk
169
170    def wsgi_setup(self, environ=None):
171        """
172        Setup the member variables used by this WSGI mixin, including
173        the ``environ`` and status member variables.
174
175        After the basic environment is created; the optional ``environ``
176        argument can be used to override any settings.
177        """
178
179        (scheme, netloc, path, query, fragment) = urlparse.urlsplit(self.path)
180        path = urllib.unquote(path)
181        endslash = path.endswith('/')
182        path = posixpath.normpath(path)
183        if endslash and path != '/':
184            # Put the slash back...
185            path += '/'
186        (server_name, server_port) = self.server.server_address
187
188        rfile = self.rfile
189        if 'HTTP/1.1' == self.protocol_version and \
190                '100-continue' == self.headers.get('Expect','').lower():
191            rfile = ContinueHook(rfile, self.wfile.write)
192        else:
193            # We can put in the protection to keep from over-reading the
194            # file
195            try:
196                content_length = int(self.headers.get('Content-Length', '0'))
197            except ValueError:
198                content_length = 0
199            if not hasattr(self, 'SSLImp') or getattr(self, 'SSLImp') != "OpenSSL":
200                # @@: LimitedLengthFile is currently broken in connection
201                # with SSL (sporatic errors that are diffcult to trace, but
202                # ones that go away when you don't use LimitedLengthFile)
203                rfile = LimitedLengthFile(rfile, content_length)
204
205        remote_address = self.client_address[0]
206        self.wsgi_environ = {
207                'wsgi.version': (1,0)
208               ,'wsgi.url_scheme': 'http'
209               ,'wsgi.input': rfile
210               ,'wsgi.errors': sys.stderr
211               ,'wsgi.multithread': True
212               ,'wsgi.multiprocess': False
213               ,'wsgi.run_once': False
214               # CGI variables required by PEP-333
215               ,'REQUEST_METHOD': self.command
216               ,'SCRIPT_NAME': '' # application is root of server
217               ,'PATH_INFO': path
218               ,'QUERY_STRING': query
219               ,'CONTENT_TYPE': self.headers.get('Content-Type', '')
220               ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0')
221               ,'SERVER_NAME': server_name
222               ,'SERVER_PORT': str(server_port)
223               ,'SERVER_PROTOCOL': self.request_version
224               # CGI not required by PEP-333
225               ,'REMOTE_ADDR': remote_address
226               }
227        if scheme:
228            self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme
229        if netloc:
230            self.wsgi_environ['paste.httpserver.proxy.host'] = netloc
231
232        if self.lookup_addresses:
233            # @@: make lookup_addreses actually work, at this point
234            #     it has been address_string() is overriden down in
235            #     file and hence is a noop
236            if remote_address.startswith("192.168.") \
237            or remote_address.startswith("10.") \
238            or remote_address.startswith("172.16."):
239                pass
240            else:
241                address_string = None # self.address_string()
242                if address_string:
243                    self.wsgi_environ['REMOTE_HOST'] = address_string
244
245        if hasattr(self.server, 'thread_pool'):
246            # Now that we know what the request was for, we should
247            # tell the thread pool what its worker is working on
248            self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ
249            self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool
250
251        for k, v in self.headers.items():
252            key = 'HTTP_' + k.replace("-","_").upper()
253            if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'):
254                continue
255            self.wsgi_environ[key] = ','.join(self.headers.getheaders(k))
256
257        if hasattr(self, 'SSLImp') and getattr(self, 'SSLImp') != "None":
258            self.wsgi_environ['wsgi.url_scheme'] = 'https'
259            # @@: extract other SSL parameters from pyOpenSSL at...
260            # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25
261
262        if environ:
263            assert isinstance(environ, dict)
264            self.wsgi_environ.update(environ)
265            if 'on' == environ.get('HTTPS'):
266                self.wsgi_environ['wsgi.url_scheme'] = 'https'
267
268        self.wsgi_curr_headers = None
269        self.wsgi_headers_sent = False
270
271    def wsgi_connection_drop(self, exce, environ=None):
272        """
273        Override this if you're interested in socket exceptions, such
274        as when the user clicks 'Cancel' during a file download.
275        """
276        pass
277
278    def wsgi_execute(self, environ=None):
279        """
280        Invoke the server's ``wsgi_application``.
281        """
282
283        self.wsgi_setup(environ)
284
285        try:
286            result = self.server.wsgi_application(self.wsgi_environ,
287                                                  self.wsgi_start_response)
288            try:
289                for chunk in result:
290                    self.wsgi_write_chunk(chunk)
291                if not self.wsgi_headers_sent:
292                    self.wsgi_write_chunk('')
293            finally:
294                if hasattr(result,'close'):
295                    result.close()
296                result = None
297        except socket.error, exce:
298            self.wsgi_connection_drop(exce, environ)
299            return
300        except:
301            if not self.wsgi_headers_sent:
302                error_msg = "Internal Server Error\n"
303                self.wsgi_curr_headers = (
304                    '500 Internal Server Error',
305                    [('Content-type', 'text/plain'),
306                     ('Content-length', str(len(error_msg)))])
307                self.wsgi_write_chunk("Internal Server Error\n")
308            raise
309
310#
311# SSL Functionality
312#
313# This implementation was motivated by Sebastien Martini's SSL example
314# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
315#
316class SSLStrategy(object):
317    """
318    Class to abstract away SSL implementation details
319    """
320   
321    class _ConnFixer(object):
322        """ wraps a socket connection so it implements makefile """
323        def __init__(self, conn):
324            self.__conn = conn
325        def makefile(self, mode, bufsize):
326            return socket._fileobject(self.__conn, mode, bufsize)
327        def __getattr__(self, attrib):
328            return getattr(self.__conn, attrib)
329
330    def __init__(self, pem, context):
331        self.enabled = False
332        self.use_builtin = False
333        self.use_openssl = False
334        if not pem and not context:
335            return
336        if pem and not context and self._use_builtin(pem):
337            self.use_builtin = True
338            self.enabled = True
339            self.pem = pem
340            return
341        if self._use_openssl():
342            self.use_openssl = True
343            self.enabled = True
344            self._configure_openssl_context(pem, context)
345            return
346        assert False, "pyOpenSSL is not installed"
347       
348    def _auto_ssl_context(self):
349        import OpenSSL, time, random
350        from OpenSSL import SSL
351        pkey = OpenSSL.crypto.PKey()
352        pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768)
353
354        cert = OpenSSL.crypto.X509()
355
356        cert.set_serial_number(random.randint(0, sys.maxint))
357        cert.gmtime_adj_notBefore(0)
358        cert.gmtime_adj_notAfter(60 * 60 * 24 * 365)
359        cert.get_subject().CN = '*'
360        cert.get_subject().O = 'Dummy Certificate'
361        cert.get_issuer().CN = 'Untrusted Authority'
362        cert.get_issuer().O = 'Self-Signed'
363        cert.set_pubkey(pkey)
364        cert.sign(pkey, 'md5')
365
366        ctx = SSL.Context(SSL.SSLv23_METHOD)
367        ctx.use_privatekey(pkey)
368        ctx.use_certificate(cert)
369
370        return ctx
371   
372    def _configure_openssl_context(self, pem, context):
373        """
374        Configures the openssl context if openssl is being
375        used
376        """
377        if not context:
378            if pem == '*':
379                context = self._auto_ssl_context()
380            else:
381                context = SSL.Context(SSL.SSLv23_METHOD)
382                context.use_privatekey_file(pem)
383                context.use_certificate_chain_file(pem)
384        self.openssl_context = context
385   
386    def _use_openssl(self):
387        try:
388            import OpenSSL
389            return True
390        except:
391            return False
392       
393    def _use_builtin(self, pem):
394        """
395        Attempts to see if the builtin module present in
396        Pyhon 2.6 and above
397        """
398        try:
399            import ssl
400            return True if pem is not "*" else False
401        except:
402            return False 
403           
404    def prepare_connection(self, conn):
405        """
406        Prepares a connection object for use by the
407        HTTPServer
408        """ 
409        if self.use_openssl:
410            # The default SSL request object does not seem to have a
411            # ``makefile(mode, bufsize)`` method as expected by
412            # Socketserver.StreamRequestHandler.
413            return SSLStrategy._ConnFixer(conn)
414        else:
415            return conn
416           
417    def create_socket(self, address_family, socket_type):
418        """
419        Creates a socket object for use by the HTTPServer
420        """
421        sock = socket.socket(address_family, socket_type)
422        if not self.is_enabled():
423            return sock
424        if self.use_builtin:
425            from ssl import wrap_socket, PROTOCOL_SSLv23
426            return wrap_socket(sock, server_side=True, certfile=self.pem,
427                        keyfile=self.pem, ssl_version=PROTOCOL_SSLv23)
428        if self.use_openssl:
429            from OpenSSL import tsafe
430            class TSafeConnection(tsafe.Connection):
431                def settimeout(self, *args):
432                    self._lock.acquire()
433                    try:
434                        return self._ssl_conn.settimeout(*args)
435                    finally:
436                        self._lock.release()
437                def gettimeout(self):
438                    self._lock.acquire()
439                    try:
440                        return self._ssl_conn.gettimeout()
441                    finally:
442                        self._lock.release()
443            return TSafeConnection(self.openssl_context, sock) 
444           
445    def get_implementation_name(self):
446        """
447        Return a string identifier for the implementation in use
448        """     
449        if not self.is_enabled():
450            return "None"
451        if self.use_builtin:
452            return "Builtin"
453        if self.use_openssl:
454            return "OpenSSL"
455           
456    def get_socket_errors(self):
457        """
458        Return a tuple of acceptable socket errors
459        """
460        if not self.is_enabled():
461            return (socket.error,)
462        elif self.use_builtin:
463            import ssl
464            return (socket.error, ssl.SSLError)
465        else:
466            from OpenSSL import SSL
467            return (socket.error, SSL.ZeroReturnError, SSL.SysCallError, SSL.Error) 
468
469    def is_enabled(self):
470        return self.enabled
471
472    def get_active_protocol(self):
473        return "https" if self.is_enabled() else "http"
474
475
476class SecureHTTPServer(HTTPServer):
477    """
478    Provides SSL server functionality on top of the BaseHTTPServer
479    by overriding _private_ members of Python's standard
480    distribution.
481    """
482
483    def __init__(self, server_address, ssl_strategy, 
484                    RequestHandlerClass, request_queue_size=None):
485        # This overrides the implementation of __init__ in python's
486        # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer
487        # does not override, thankfully).
488        HTTPServer.__init__(self, server_address, RequestHandlerClass)
489
490        self.ssl_strategy = ssl_strategy
491        socket_errors = ssl_strategy.get_socket_errors()
492        imp_name = ssl_strategy.get_implementation_name()
493        setattr(RequestHandlerClass, "SocketErrors", socket_errors)
494        setattr(RequestHandlerClass, "SSLImp", imp_name)
495       
496        self.socket = ssl_strategy.create_socket(self.address_family,
497                                                self.socket_type)
498        self.server_bind()
499        if request_queue_size:
500            self.socket.listen(request_queue_size)
501        self.server_activate()
502
503    def get_request(self):
504        (conn, info) = self.socket.accept()
505        conn = self.ssl_strategy.prepare_connection(conn)
506        return (conn, info)
507
508
509class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler):
510    """
511    A WSGI handler that overrides POST, GET and HEAD to delegate
512    requests to the server's ``wsgi_application``.
513    """
514    server_version = 'PasteWSGIServer/' + __version__
515
516    def handle_one_request(self):
517        """Handle a single HTTP request.
518
519        You normally don't need to override this method; see the class
520        __doc__ string for information on how to handle specific HTTP
521        commands such as GET and POST.
522
523        """
524        self.raw_requestline = self.rfile.readline()
525        if not self.raw_requestline:
526            self.close_connection = 1
527            return
528        if not self.parse_request(): # An error code has been sent, just exit
529            return
530        self.wsgi_execute()
531
532    def handle(self):
533        # don't bother logging disconnects while handling a request
534        try:
535            BaseHTTPRequestHandler.handle(self)
536        except self.SocketErrors, exce:
537            self.wsgi_connection_drop(exce)
538
539    def address_string(self):
540        """Return the client address formatted for logging.
541
542        This is overridden so that no hostname lookup is done.
543        """
544        return ''
545
546class LimitedLengthFile(object):
547    def __init__(self, file, length):
548        self.file = file
549        self.length = length
550        self._consumed = 0
551        if hasattr(self.file, 'seek'):
552            self.seek = self._seek
553
554    def __repr__(self):
555        base_repr = repr(self.file)
556        return base_repr[:-1] + ' length=%s>' % self.length
557
558    def read(self, length=None):
559        left = self.length - self._consumed
560        if length is None:
561            length = left
562        else:
563            length = min(length, left)
564        # next two lines are hnecessary only if read(0) blocks
565        if not left:
566            return ''
567        data = self.file.read(length)
568        self._consumed += len(data)
569        return data
570
571    def readline(self, *args):
572        max_read = self.length - self._consumed
573        if len(args):
574            max_read = min(args[0], max_read)
575        data = self.file.readline(max_read)
576        self._consumed += len(data)
577        return data
578
579    def readlines(self, hint=None):
580        data = self.file.readlines(hint)
581        for chunk in data:
582            self._consumed += len(chunk)
583        return data
584
585    def __iter__(self):
586        return self
587
588    def next(self):
589        if self.length - self._consumed <= 0:
590            raise StopIteration
591        return self.readline()
592
593    ## Optional methods ##
594
595    def _seek(self, place):
596        self.file.seek(place)
597        self._consumed = place
598
599    def tell(self):
600        if hasattr(self.file, 'tell'):
601            return self.file.tell()
602        else:
603            return self._consumed
604
605class ThreadPool(object):
606    """
607    Generic thread pool with a queue of callables to consume.
608
609    Keeps a notion of the status of its worker threads:
610
611    idle: worker thread with nothing to do
612
613    busy: worker thread doing its job
614
615    hung: worker thread that's been doing a job for too long
616
617    dying: a hung thread that has been killed, but hasn't died quite
618    yet.
619
620    zombie: what was a worker thread that we've tried to kill but
621    isn't dead yet.
622
623    At any time you can call track_threads, to get a dictionary with
624    these keys and lists of thread_ids that fall in that status.  All
625    keys will be present, even if they point to emty lists.
626
627    hung threads are threads that have been busy more than
628    hung_thread_limit seconds.  Hung threads are killed when they live
629    longer than kill_thread_limit seconds.  A thread is then
630    considered dying for dying_limit seconds, if it is still alive
631    after that it is considered a zombie.
632
633    When there are no idle workers and a request comes in, another
634    worker *may* be spawned.  If there are less than spawn_if_under
635    threads in the busy state, another thread will be spawned.  So if
636    the limit is 5, and there are 4 hung threads and 6 busy threads,
637    no thread will be spawned.
638
639    When there are more than max_zombie_threads_before_die zombie
640    threads, a SystemExit exception will be raised, stopping the
641    server.  Use 0 or None to never raise this exception.  Zombie
642    threads *should* get cleaned up, but killing threads is no
643    necessarily reliable.  This is turned off by default, since it is
644    only a good idea if you've deployed the server with some process
645    watching from above (something similar to daemontools or zdaemon).
646
647    Each worker thread only processes ``max_requests`` tasks before it
648    dies and replaces itself with a new worker thread.
649    """
650
651
652    SHUTDOWN = object()
653
654    def __init__(
655        self, nworkers, name="ThreadPool", daemon=False,
656        max_requests=100, # threads are killed after this many requests
657        hung_thread_limit=30, # when a thread is marked "hung"
658        kill_thread_limit=1800, # when you kill that hung thread
659        dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie")
660        spawn_if_under=5, # spawn if there's too many hung threads
661        max_zombie_threads_before_die=0, # when to give up on the process
662        hung_check_period=100, # every 100 requests check for hung workers
663        logger=None, # Place to log messages to
664        error_email=None, # Person(s) to notify if serious problem occurs
665        ):
666        """
667        Create thread pool with `nworkers` worker threads.
668        """
669        self.nworkers = nworkers
670        self.max_requests = max_requests
671        self.name = name
672        self.queue = Queue.Queue()
673        self.workers = []
674        self.daemon = daemon
675        if logger is None:
676            logger = logging.getLogger('paste.httpserver.ThreadPool')
677        if isinstance(logger, basestring):
678            logger = logging.getLogger(logger)
679        self.logger = logger
680        self.error_email = error_email
681        self._worker_count = count()
682
683        assert (not kill_thread_limit
684                or kill_thread_limit >= hung_thread_limit), (
685            "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)"
686            % (kill_thread_limit, hung_thread_limit))
687        if not killthread:
688            kill_thread_limit = 0
689            self.logger.info(
690                "Cannot use kill_thread_limit as ctypes/killthread is not available")
691        self.kill_thread_limit = kill_thread_limit
692        self.dying_limit = dying_limit
693        self.hung_thread_limit = hung_thread_limit
694        assert spawn_if_under <= nworkers, (
695            "spawn_if_under (%s) should be less than nworkers (%s)"
696            % (spawn_if_under, nworkers))
697        self.spawn_if_under = spawn_if_under
698        self.max_zombie_threads_before_die = max_zombie_threads_before_die
699        self.hung_check_period = hung_check_period
700        self.requests_since_last_hung_check = 0
701        # Used to keep track of what worker is doing what:
702        self.worker_tracker = {}
703        # Used to keep track of the workers not doing anything:
704        self.idle_workers = []
705        # Used to keep track of threads that have been killed, but maybe aren't dead yet:
706        self.dying_threads = {}
707        # This is used to track when we last had to add idle workers;
708        # we shouldn't cull extra workers until some time has passed
709        # (hung_thread_limit) since workers were added:
710        self._last_added_new_idle_workers = 0
711        if not daemon:
712            atexit.register(self.shutdown)
713        for i in range(self.nworkers):
714            self.add_worker_thread(message='Initial worker pool')
715
716    def add_task(self, task):
717        """
718        Add a task to the queue
719        """
720        self.logger.debug('Added task (%i tasks queued)', self.queue.qsize())
721        if self.hung_check_period:
722            self.requests_since_last_hung_check += 1
723            if self.requests_since_last_hung_check > self.hung_check_period:
724                self.requests_since_last_hung_check = 0
725                self.kill_hung_threads()
726        if not self.idle_workers and self.spawn_if_under:
727            # spawn_if_under can come into effect...
728            busy = 0
729            now = time.time()
730            self.logger.debug('No idle workers for task; checking if we need to make more workers')
731            for worker in self.workers:
732                if not hasattr(worker, 'thread_id'):
733                    # Not initialized
734                    continue
735                time_started, info = self.worker_tracker.get(worker.thread_id,
736                                                             (None, None))
737                if time_started is not None:
738                    if now - time_started < self.hung_thread_limit:
739                        busy += 1
740            if busy < self.spawn_if_under:
741                self.logger.info(
742                    'No idle tasks, and only %s busy tasks; adding %s more '
743                    'workers', busy, self.spawn_if_under-busy)
744                self._last_added_new_idle_workers = time.time()
745                for i in range(self.spawn_if_under - busy):
746                    self.add_worker_thread(message='Response to lack of idle workers')
747            else:
748                self.logger.debug(
749                    'No extra workers needed (%s busy workers)',
750                    busy)
751        if (len(self.workers) > self.nworkers
752            and len(self.idle_workers) > 3
753            and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit):
754            # We've spawned worers in the past, but they aren't needed
755            # anymore; kill off some
756            self.logger.info(
757                'Culling %s extra workers (%s idle workers present)',
758                len(self.workers)-self.nworkers, len(self.idle_workers))
759            self.logger.debug(
760                'Idle workers: %s', self.idle_workers)
761            for i in range(len(self.workers) - self.nworkers):
762                self.queue.put(self.SHUTDOWN)
763        self.queue.put(task)
764
765    def track_threads(self):
766        """
767        Return a dict summarizing the threads in the pool (as
768        described in the ThreadPool docstring).
769        """
770        result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[])
771        now = time.time()
772        for worker in self.workers:
773            if not hasattr(worker, 'thread_id'):
774                # The worker hasn't fully started up, we should just
775                # ignore it
776                continue
777            time_started, info = self.worker_tracker.get(worker.thread_id,
778                                                         (None, None))
779            if time_started is not None:
780                if now - time_started > self.hung_thread_limit:
781                    result['hung'].append(worker)
782                else:
783                    result['busy'].append(worker)
784            else:
785                result['idle'].append(worker)
786        for thread_id, (time_killed, worker) in self.dying_threads.items():
787            if not self.thread_exists(thread_id):
788                # Cull dying threads that are actually dead and gone
789                self.logger.info('Killed thread %s no longer around',
790                                 thread_id)
791                try:
792                    del self.dying_threads[thread_id]
793                except KeyError:
794                    pass
795                continue
796            if now - time_killed > self.dying_limit:
797                result['zombie'].append(worker)
798            else:
799                result['dying'].append(worker)
800        return result
801
802    def kill_worker(self, thread_id):
803        """
804        Removes the worker with the given thread_id from the pool, and
805        replaces it with a new worker thread.
806
807        This should only be done for mis-behaving workers.
808        """
809        if killthread is None:
810            raise RuntimeError(
811                "Cannot kill worker; killthread/ctypes not available")
812        thread_obj = threading._active.get(thread_id)
813        killthread.async_raise(thread_id, SystemExit)
814        try:
815            del self.worker_tracker[thread_id]
816        except KeyError:
817            pass
818        self.logger.info('Killing thread %s', thread_id)
819        if thread_obj in self.workers:
820            self.workers.remove(thread_obj)
821        self.dying_threads[thread_id] = (time.time(), thread_obj)
822        self.add_worker_thread(message='Replacement for killed thread %s' % thread_id)
823
824    def thread_exists(self, thread_id):
825        """
826        Returns true if a thread with this id is still running
827        """
828        return thread_id in threading._active
829
830    def add_worker_thread(self, *args, **kwargs):
831        index = self._worker_count.next()
832        worker = threading.Thread(target=self.worker_thread_callback,
833                                  args=args, kwargs=kwargs,
834                                  name=("worker %d" % index))
835        worker.setDaemon(self.daemon)
836        worker.start()
837
838    def kill_hung_threads(self):
839        """
840        Tries to kill any hung threads
841        """
842        if not self.kill_thread_limit:
843            # No killing should occur
844            return
845        now = time.time()
846        max_time = 0
847        total_time = 0
848        idle_workers = 0
849        starting_workers = 0
850        working_workers = 0
851        killed_workers = 0
852        for worker in self.workers:
853            if not hasattr(worker, 'thread_id'):
854                # Not setup yet
855                starting_workers += 1
856                continue
857            time_started, info = self.worker_tracker.get(worker.thread_id,
858                                                         (None, None))
859            if time_started is None:
860                # Must be idle
861                idle_workers += 1
862                continue
863            working_workers += 1
864            max_time = max(max_time, now-time_started)
865            total_time += now-time_started
866            if now - time_started > self.kill_thread_limit:
867                self.logger.warning(
868                    'Thread %s hung (working on task for %i seconds)',
869                    worker.thread_id, now - time_started)
870                try:
871                    import pprint
872                    info_desc = pprint.pformat(info)
873                except:
874                    out = StringIO()
875                    traceback.print_exc(file=out)
876                    info_desc = 'Error:\n%s' % out.getvalue()
877                self.notify_problem(
878                    "Killing worker thread (id=%(thread_id)s) because it has been \n"
879                    "working on task for %(time)s seconds (limit is %(limit)s)\n"
880                    "Info on task:\n"
881                    "%(info)s"
882                    % dict(thread_id=worker.thread_id,
883                           time=now - time_started,
884                           limit=self.kill_thread_limit,
885                           info=info_desc))
886                self.kill_worker(worker.thread_id)
887                killed_workers += 1
888        if working_workers:
889            ave_time = float(total_time) / working_workers
890            ave_time = '%.2fsec' % ave_time
891        else:
892            ave_time = 'N/A'
893        self.logger.info(
894            "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) "
895            "ave time %s, max time %.2fsec, killed %s workers"
896            % (idle_workers + starting_workers + working_workers,
897               working_workers, idle_workers, starting_workers,
898               ave_time, max_time, killed_workers))
899        self.check_max_zombies()
900
901    def check_max_zombies(self):
902        """
903        Check if we've reached max_zombie_threads_before_die; if so
904        then kill the entire process.
905        """
906        if not self.max_zombie_threads_before_die:
907            return
908        found = []
909        now = time.time()
910        for thread_id, (time_killed, worker) in self.dying_threads.items():
911            if not self.thread_exists(thread_id):
912                # Cull dying threads that are actually dead and gone
913                try:
914                    del self.dying_threads[thread_id]
915                except KeyError:
916                    pass
917                continue
918            if now - time_killed > self.dying_limit:
919                found.append(thread_id)
920        if found:
921            self.logger.info('Found %s zombie threads', found)
922        if len(found) > self.max_zombie_threads_before_die:
923            self.logger.fatal(
924                'Exiting process because %s zombie threads is more than %s limit',
925                len(found), self.max_zombie_threads_before_die)
926            self.notify_problem(
927                "Exiting process because %(found)s zombie threads "
928                "(more than limit of %(limit)s)\n"
929                "Bad threads (ids):\n"
930                %(ids)s\n"
931                % dict(found=len(found),
932                       limit=self.max_zombie_threads_before_die,
933                       ids="\n  ".join(map(str, found))),
934                subject="Process restart (too many zombie threads)")
935            self.shutdown(10)
936            print 'Shutting down', threading.currentThread()
937            raise ServerExit(3)
938
939    def worker_thread_callback(self, message=None):
940        """
941        Worker thread should call this method to get and process queued
942        callables.
943        """
944        thread_obj = threading.currentThread()
945        thread_id = thread_obj.thread_id = thread.get_ident()
946        self.workers.append(thread_obj)
947        self.idle_workers.append(thread_id)
948        requests_processed = 0
949        add_replacement_worker = False
950        self.logger.debug('Started new worker %s: %s', thread_id, message)
951        try:
952            while True:
953                if self.max_requests and self.max_requests < requests_processed:
954                    # Replace this thread then die
955                    self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
956                                      % (thread_id, requests_processed, self.max_requests))
957                    add_replacement_worker = True
958                    break
959                runnable = self.queue.get()
960                if runnable is ThreadPool.SHUTDOWN:
961                    self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
962                    break
963                try:
964                    self.idle_workers.remove(thread_id)
965                except ValueError:
966                    pass
967                self.worker_tracker[thread_id] = [time.time(), None]
968                requests_processed += 1
969                try:
970                    try:
971                        runnable()
972                    except:
973                        # We are later going to call sys.exc_clear(),
974                        # removing all remnants of any exception, so
975                        # we should log it now.  But ideally no
976                        # exception should reach this level
977                        print >> sys.stderr, (
978                            'Unexpected exception in worker %r' % runnable)
979                        traceback.print_exc()
980                    if thread_id in self.dying_threads:
981                        # That last exception was intended to kill me
982                        break
983                finally:
984                    try:
985                        del self.worker_tracker[thread_id]
986                    except KeyError:
987                        pass
988                    sys.exc_clear()
989                self.idle_workers.append(thread_id)
990        finally:
991            try:
992                del self.worker_tracker[thread_id]
993            except KeyError:
994                pass
995            try:
996                self.idle_workers.remove(thread_id)
997            except ValueError:
998                pass
999            try:
1000                self.workers.remove(thread_obj)
1001            except ValueError:
1002                pass
1003            try:
1004                del self.dying_threads[thread_id]
1005            except KeyError:
1006                pass
1007            if add_replacement_worker:
1008                self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id)
1009
1010    def shutdown(self, force_quit_timeout=0):
1011        """
1012        Shutdown the queue (after finishing any pending requests).
1013        """
1014        self.logger.info('Shutting down threadpool')
1015        # Add a shutdown request for every worker
1016        for i in range(len(self.workers)):
1017            self.queue.put(ThreadPool.SHUTDOWN)
1018        # Wait for each thread to terminate
1019        hung_workers = []
1020        for worker in self.workers:
1021            worker.join(0.5)
1022            if worker.isAlive():
1023                hung_workers.append(worker)
1024        zombies = []
1025        for thread_id in self.dying_threads:
1026            if self.thread_exists(thread_id):
1027                zombies.append(thread_id)
1028        if hung_workers or zombies:
1029            self.logger.info("%s workers didn't stop properly, and %s zombies",
1030                             len(hung_workers), len(zombies))
1031            if hung_workers:
1032                for worker in hung_workers:
1033                    self.kill_worker(worker.thread_id)
1034                self.logger.info('Workers killed forcefully')
1035            if force_quit_timeout:
1036                hung = []
1037                timed_out = False
1038                need_force_quit = bool(zombies)
1039                for workers in self.workers:
1040                    if not timed_out and worker.isAlive():
1041                        timed_out = True
1042                        worker.join(force_quit_timeout)
1043                    if worker.isAlive():
1044                        print "Worker %s won't die" % worker
1045                        need_force_quit = True
1046                if need_force_quit:
1047                    import atexit
1048                    # Remove the threading atexit callback
1049                    for callback in list(atexit._exithandlers):
1050                        func = getattr(callback[0], 'im_func', None)
1051                        if not func:
1052                            continue
1053                        globs = getattr(func, 'func_globals', {})
1054                        mod = globs.get('__name__')
1055                        if mod == 'threading':
1056                            atexit._exithandlers.remove(callback)
1057                    atexit._run_exitfuncs()
1058                    print 'Forcefully exiting process'
1059                    os._exit(3)
1060                else:
1061                    self.logger.info('All workers eventually killed')
1062        else:
1063            self.logger.info('All workers stopped')
1064
1065    def notify_problem(self, msg, subject=None, spawn_thread=True):
1066        """
1067        Called when there's a substantial problem.  msg contains the
1068        body of the notification, subject the summary.
1069
1070        If spawn_thread is true, then the email will be send in
1071        another thread (so this doesn't block).
1072        """
1073        if not self.error_email:
1074            return
1075        if spawn_thread:
1076            t = threading.Thread(
1077                target=self.notify_problem,
1078                args=(msg, subject, False))
1079            t.start()
1080            return
1081        from_address = 'errors@localhost'
1082        if not subject:
1083            subject = msg.strip().splitlines()[0]
1084            subject = subject[:50]
1085            subject = '[http threadpool] %s' % subject
1086        headers = [
1087            "To: %s" % self.error_email,
1088            "From: %s" % from_address,
1089            "Subject: %s" % subject,
1090            ]
1091        try:
1092            system = ' '.join(os.uname())
1093        except:
1094            system = '(unknown)'
1095        body = (
1096            "An error has occurred in the paste.httpserver.ThreadPool\n"
1097            "Error:\n"
1098            %(msg)s\n"
1099            "Occurred at: %(time)s\n"
1100            "PID: %(pid)s\n"
1101            "System: %(system)s\n"
1102            "Server .py file: %(file)s\n"
1103            % dict(msg=msg,
1104                   time=time.strftime("%c"),
1105                   pid=os.getpid(),
1106                   system=system,
1107                   file=os.path.abspath(__file__),
1108                   ))
1109        message = '\n'.join(headers) + "\n\n" + body
1110        import smtplib
1111        server = smtplib.SMTP('localhost')
1112        error_emails = [
1113            e.strip() for e in self.error_email.split(",")
1114            if e.strip()]
1115        server.sendmail(from_address, error_emails, message)
1116        server.quit()
1117        print 'email sent to', error_emails, message
1118
1119class ThreadPoolMixIn(object):
1120    """
1121    Mix-in class to process requests from a thread pool
1122    """
1123    def __init__(self, nworkers, daemon=False, **threadpool_options):
1124        # Create and start the workers
1125        self.running = True
1126        assert nworkers > 0, "ThreadPoolMixIn servers must have at least one worker"
1127        self.thread_pool = ThreadPool(
1128            nworkers,
1129            "ThreadPoolMixIn HTTP server on %s:%d"
1130            % (self.server_name, self.server_port),
1131            daemon,
1132            **threadpool_options)
1133
1134    def process_request(self, request, client_address):
1135        """
1136        Queue the request to be processed by on of the thread pool threads
1137        """
1138        # This sets the socket to blocking mode (and no timeout) since it
1139        # may take the thread pool a little while to get back to it. (This
1140        # is the default but since we set a timeout on the parent socket so
1141        # that we can trap interrupts we need to restore this,.)
1142        request.setblocking(1)
1143        # Queue processing of the request
1144        self.thread_pool.add_task(
1145             lambda: self.process_request_in_thread(request, client_address))
1146
1147    def handle_error(self, request, client_address):
1148        exc_class, exc, tb = sys.exc_info()
1149        if exc_class is ServerExit:
1150            # This is actually a request to stop the server
1151            raise
1152        return super(ThreadPoolMixIn, self).handle_error(request, client_address)
1153
1154    def process_request_in_thread(self, request, client_address):
1155        """
1156        The worker thread should call back here to do the rest of the
1157        request processing. Error handling normaller done in 'handle_request'
1158        must be done here.
1159        """
1160        try:
1161            self.finish_request(request, client_address)
1162            self.close_request(request)
1163        except:
1164            self.handle_error(request, client_address)
1165            self.close_request(request)
1166            exc = sys.exc_info()[1]
1167            if isinstance(exc, (MemoryError, KeyboardInterrupt)):
1168                raise
1169
1170    def serve_forever(self):
1171        """
1172        Overrides `serve_forever` to shut the threadpool down cleanly.
1173        """
1174        try:
1175            while self.running:
1176                try:
1177                    self.handle_request()
1178                except socket.timeout:
1179                    # Timeout is expected, gives interrupts a chance to
1180                    # propogate, just keep handling
1181                    pass
1182        finally:
1183            self.thread_pool.shutdown()
1184
1185    def server_activate(self):
1186        """
1187        Overrides server_activate to set timeout on our listener socket.
1188        """
1189        # We set the timeout here so that we can trap interrupts on windows
1190        self.socket.settimeout(1)
1191
1192    def server_close(self):
1193        """
1194        Finish pending requests and shutdown the server.
1195        """
1196        self.running = False
1197        self.socket.close()
1198        self.thread_pool.shutdown(60)
1199
1200class WSGIServerBase(SecureHTTPServer):
1201    def __init__(self, wsgi_application, server_address,
1202                 ssl_strategy, RequestHandlerClass=None,
1203                 request_queue_size=None):
1204        SecureHTTPServer.__init__(self, server_address,
1205                                  ssl_strategy, RequestHandlerClass,
1206                                  request_queue_size=request_queue_size)
1207        self.wsgi_application = wsgi_application
1208        self.wsgi_socket_timeout = None
1209
1210    def get_request(self):
1211        # If there is a socket_timeout, set it on the accepted
1212        (conn,info) = SecureHTTPServer.get_request(self)
1213        if self.wsgi_socket_timeout:
1214            conn.settimeout(self.wsgi_socket_timeout)
1215        return (conn, info)
1216
1217class WSGIServer(ThreadingMixIn, WSGIServerBase):
1218    daemon_threads = False
1219
1220class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
1221    def __init__(self, wsgi_application, server_address,
1222                 ssl_strategy, RequestHandlerClass=None,
1223                 nworkers=10, daemon_threads=False,
1224                 threadpool_options=None, request_queue_size=None):
1225        WSGIServerBase.__init__(self, wsgi_application, server_address,
1226                                ssl_strategy, RequestHandlerClass,
1227                                request_queue_size=request_queue_size)
1228        if threadpool_options is None:
1229            threadpool_options = {}
1230        ThreadPoolMixIn.__init__(self, nworkers, daemon_threads,
1231                                 **threadpool_options)
1232
1233class ServerExit(SystemExit):
1234    """
1235    Raised to tell the server to really exit (SystemExit is normally
1236    caught)
1237    """
1238
1239def serve(application, host=None, port=None, handler=None, ssl_pem=None,
1240          ssl_context=None, server_version=None, protocol_version=None,
1241          start_loop=True, daemon_threads=None, socket_timeout=None,
1242          use_threadpool=None, threadpool_workers=10,
1243          threadpool_options=None, request_queue_size=5):
1244    """
1245    Serves your ``application`` over HTTP(S) via WSGI interface
1246
1247    ``host``
1248
1249        This is the ipaddress to bind to (or a hostname if your
1250        nameserver is properly configured).  This defaults to
1251        127.0.0.1, which is not a public interface.
1252
1253    ``port``
1254
1255        The port to run on, defaults to 8080 for HTTP, or 4443 for
1256        HTTPS. This can be a string or an integer value.
1257
1258    ``handler``
1259
1260        This is the HTTP request handler to use, it defaults to
1261        ``WSGIHandler`` in this module.
1262
1263    ``ssl_pem``
1264
1265        This an optional SSL certificate file (via OpenSSL). You can
1266        supply ``*`` and a development-only certificate will be
1267        created for you, or you can generate a self-signed test PEM
1268        certificate file as follows::
1269
1270            $ openssl genrsa 1024 > host.key
1271            $ chmod 400 host.key
1272            $ openssl req -new -x509 -nodes -sha1 -days 365  \\
1273                          -key host.key > host.cert
1274            $ cat host.cert host.key > host.pem
1275            $ chmod 400 host.pem
1276
1277    ``ssl_context``
1278
1279        This an optional SSL context object for the server.  A SSL
1280        context will be automatically constructed for you if you supply
1281        ``ssl_pem``.  Supply this to use a context of your own
1282        construction.
1283
1284    ``server_version``
1285
1286        The version of the server as reported in HTTP response line. This
1287        defaults to something like "PasteWSGIServer/0.5".  Many servers
1288        hide their code-base identity with a name like 'Amnesiac/1.0'
1289
1290    ``protocol_version``
1291
1292        This sets the protocol used by the server, by default
1293        ``HTTP/1.0``. There is some support for ``HTTP/1.1``, which
1294        defaults to nicer keep-alive connections.  This server supports
1295        ``100 Continue``, but does not yet support HTTP/1.1 Chunked
1296        Encoding. Hence, if you use HTTP/1.1, you're somewhat in error
1297        since chunked coding is a mandatory requirement of a HTTP/1.1
1298        server.  If you specify HTTP/1.1, every response *must* have a
1299        ``Content-Length`` and you must be careful not to read past the
1300        end of the socket.
1301
1302    ``start_loop``
1303
1304        This specifies if the server loop (aka ``server.serve_forever()``)
1305        should be called; it defaults to ``True``.
1306
1307    ``daemon_threads``
1308
1309        This flag specifies if when your webserver terminates all
1310        in-progress client connections should be droppped.  It defaults
1311        to ``False``.   You might want to set this to ``True`` if you
1312        are using ``HTTP/1.1`` and don't set a ``socket_timeout``.
1313
1314    ``socket_timeout``
1315
1316        This specifies the maximum amount of time that a connection to a
1317        given client will be kept open.  At this time, it is a rude
1318        disconnect, but at a later time it might follow the RFC a bit
1319        more closely.
1320
1321    ``use_threadpool``
1322
1323        Server requests from a pool of worker threads (``threadpool_workers``)
1324        rather than creating a new thread for each request. This can
1325        substantially reduce latency since there is a high cost associated
1326        with thread creation.
1327
1328    ``threadpool_workers``
1329
1330        Number of worker threads to create when ``use_threadpool`` is true. This
1331        can be a string or an integer value.
1332
1333    ``threadpool_options``
1334
1335        A dictionary of options to be used when instantiating the
1336        threadpool.  See paste.httpserver.ThreadPool for specific
1337        options (``threadpool_workers`` is a specific option that can
1338        also go here).
1339   
1340    ``request_queue_size``
1341
1342        The 'backlog' argument to socket.listen(); specifies the
1343        maximum number of queued connections.
1344
1345    """
1346
1347    host = host or '127.0.0.1'
1348    if port is None:
1349        if ':' in host:
1350            host, port = host.split(':', 1)
1351        else:
1352            port = 8080
1353    server_address = (host, int(port))
1354   
1355    ssl_strategy = SSLStrategy(ssl_pem, ssl_context)
1356    port = int(port if not ssl_strategy.is_enabled() else 4443)
1357
1358    if not handler:
1359        handler = WSGIHandler
1360    if server_version:
1361        handler.server_version = server_version
1362        handler.sys_version = None
1363    if protocol_version:
1364        assert protocol_version in ('HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1')
1365        handler.protocol_version = protocol_version
1366
1367    if use_threadpool is None:
1368        use_threadpool = True
1369
1370    if converters.asbool(use_threadpool):
1371        server = WSGIThreadPoolServer(application, server_address, ssl_strategy,
1372                                      handler, int(threadpool_workers),
1373                                      daemon_threads,
1374                                      threadpool_options=threadpool_options,
1375                                      request_queue_size=request_queue_size)
1376    else:
1377        server = WSGIServer(application, server_address, ssl_strategy, handler,
1378                            request_queue_size=request_queue_size)
1379        if daemon_threads:
1380            server.daemon_threads = daemon_threads
1381
1382    if socket_timeout:
1383        server.wsgi_socket_timeout = int(socket_timeout)
1384
1385    if converters.asbool(start_loop):
1386        protocol = ssl_strategy.get_active_protocol()
1387        host, port = server.server_address
1388        if host == '0.0.0.0':
1389            print 'serving on 0.0.0.0:%s view at %s://127.0.0.1:%s' % \
1390                (port, protocol, port)
1391        else:
1392            print "serving on %s://%s:%s" % (protocol, host, port)
1393        try:
1394            server.serve_forever()
1395        except KeyboardInterrupt:
1396            # allow CTRL+C to shutdown
1397            pass
1398    return server
1399
1400# For paste.deploy server instantiation (egg:Paste#http)
1401# Note: this gets a separate function because it has to expect string
1402# arguments (though that's not much of an issue yet, ever?)
1403def server_runner(wsgi_app, global_conf, **kwargs):
1404    from paste.deploy.converters import asbool
1405    for name in ['port', 'socket_timeout', 'threadpool_workers',
1406                 'threadpool_hung_thread_limit',
1407                 'threadpool_kill_thread_limit',
1408                 'threadpool_dying_limit', 'threadpool_spawn_if_under',
1409                 'threadpool_max_zombie_threads_before_die',
1410                 'threadpool_hung_check_period',
1411                 'threadpool_max_requests', 'request_queue_size']:
1412        if name in kwargs:
1413            kwargs[name] = int(kwargs[name])
1414    for name in ['use_threadpool', 'daemon_threads']:
1415        if name in kwargs:
1416            kwargs[name] = asbool(kwargs[name])
1417    threadpool_options = {}
1418    for name, value in kwargs.items():
1419        if name.startswith('threadpool_') and name != 'threadpool_workers':
1420            threadpool_options[name[len('threadpool_'):]] = value
1421            del kwargs[name]
1422    if ('error_email' not in threadpool_options
1423        and 'error_email' in global_conf):
1424        threadpool_options['error_email'] = global_conf['error_email']
1425    kwargs['threadpool_options'] = threadpool_options
1426    serve(wsgi_app, **kwargs)
1427
1428server_runner.__doc__ = (serve.__doc__ or '') + """
1429
1430    You can also set these threadpool options:
1431
1432    ``threadpool_max_requests``:
1433
1434        The maximum number of requests a worker thread will process
1435        before dying (and replacing itself with a new worker thread).
1436        Default 100.
1437
1438    ``threadpool_hung_thread_limit``:
1439
1440        The number of seconds a thread can work on a task before it is
1441        considered hung (stuck).  Default 30 seconds.
1442
1443    ``threadpool_kill_thread_limit``:
1444
1445        The number of seconds a thread can work before you should kill it
1446        (assuming it will never finish).  Default 600 seconds (10 minutes).
1447
1448    ``threadpool_dying_limit``:
1449
1450        The length of time after killing a thread that it should actually
1451        disappear.  If it lives longer than this, it is considered a
1452        "zombie".  Note that even in easy situations killing a thread can
1453        be very slow.  Default 300 seconds (5 minutes).
1454
1455    ``threadpool_spawn_if_under``:
1456
1457        If there are no idle threads and a request comes in, and there are
1458        less than this number of *busy* threads, then add workers to the
1459        pool.  Busy threads are threads that have taken less than
1460        ``threadpool_hung_thread_limit`` seconds so far.  So if you get
1461        *lots* of requests but they complete in a reasonable amount of time,
1462        the requests will simply queue up (adding more threads probably
1463        wouldn't speed them up).  But if you have lots of hung threads and
1464        one more request comes in, this will add workers to handle it.
1465        Default 5.
1466
1467    ``threadpool_max_zombie_threads_before_die``:
1468
1469        If there are more zombies than this, just kill the process.  This is
1470        only good if you have a monitor that will automatically restart
1471        the server.  This can clean up the mess.  Default 0 (disabled).
1472
1473    `threadpool_hung_check_period``:
1474
1475        Every X requests, check for hung threads that need to be killed,
1476        or for zombie threads that should cause a restart.  Default 100
1477        requests.
1478
1479    ``threadpool_logger``:
1480
1481        Logging messages will go the logger named here.
1482
1483    ``threadpool_error_email`` (or global ``error_email`` setting):
1484
1485        When threads are killed or the process restarted, this email
1486        address will be contacted (using an SMTP server on localhost).
1487   
1488"""
1489
1490
1491if __name__ == '__main__':
1492    from paste.wsgilib import dump_environ
1493    #serve(dump_environ, ssl_pem="test.pem")
1494    serve(dump_environ, server_version="Wombles/1.0",
1495          protocol_version="HTTP/1.1", port="8888")
1496