Python DevCenter
oreilly.comSafari Books Online.Conferences.

advertisement


Understanding Network I/O, Part 2
Pages: 1, 2, 3, 4

Multitasking

Multitasking is an operating system feature that allows several jobs to be done concurrently. All modern, mainstream operating systems such as Linux, Solaris, Windows, and Mac OS X support multitasking.



Multitasking comes in two basic varieties, processes and threads. The former provide greater isolation between the tasks. In particular, one process cannot overwrite the memory space of another, unless both processes explicitly share a portion of their memory spaces. Thus, damage from a faulty program is (usually) limited to the process in which that program executes.

Threads sacrifice the safety of processes for increased performance. Multiple threads can be started inside a single process; all of these threads then share the process's memory space. The operating system usually does much less work when switching between threads (inside the same process) than when switching between processes.

Many networked applications rely heavily on threads to handle I/O requests in a timely manner. Threads are therefore the focus of the rest of this section. Although programming with threads is difficult, there are plenty of resources available to assist you. Several of them are listed in the Further Reading appendix. If you are not familiar with using threads (or multitasking in general) be sure to read some of the background material before doing any serious work with this technology.

If you are new to multithreaded designs, keep in mind that your goal is actually to use fewer threads and especially fewer locks in your program. This may seem like a contradiction; in particular corrupting unlocked data is quite similar in effect to a dangling reference in a language such as C++. Nevertheless, every extra thread you add increases the system overhead, and every extra lock decreases concurrency.

Ultimately, you are trying to improve concurrency — doing things in parallel — when using threads. Learn to think in terms of adding concurrency rather than adding threads. This will help put you in the right frame of mind for creating effective threaded designs.

Thread-per-Request

The allure of multitasking in networked applications is its similarity to plain synchronous I/O covered earlier. In fact, our threaded examples even use the same urllib library to carry out the I/O, except that now urllib is being called from multiple threads.

One way to add concurrency to the previous example is to launch a separate thread for every city on our list. Each thread will retrieve, process, and display the data for only one city, and then exit.

Servers often employ this method of using threads (or threading model) to process requests. It is therefore commonly referred to as the thread-per-request model. The following example shows an implementation of thread-per-request.

Example 3. A thread-per-request client

import urllib     # Library for retrieving files using a URL.
import re         # Library for finding patterns in text.
import threading  # High-level thread library.

# Three NOAA web pages, showing current conditions in New York,
# London and Tokyo, respectively.
citydata = (('New York','http://weather.noaa.gov/weather/current/KNYC.html'),
            ('London',  'http://weather.noaa.gov/weather/current/EGLC.html'),
            ('Tokyo',   'http://weather.noaa.gov/weather/current/RJTT.html'))

# The maximum amount of data we are prepared to read, from any single page.
MAX_PAGE_LEN = 20000

# Function to be run by each thread.
def read_temperature(name,url,max):
     # Open and read the web page; catch any I/O errors.
     try:
          webpage = urllib.urlopen(url).read(max)
     except IOError, e:
          # An I/O error occurred; print the error message and end the thread.
          print 'I/O Error when reading URL',url,':\n',e.strerror
          return

     # Pattern which matches text like '66.9 F'.  The last
     # argument ('re.S') is a flag, which effectively causes
     # newlines to be treated as ordinary characters.
     match = re.search(r'(-?\d+(?:\.\d+)?) F',webpage,re.S)

     # Print out the matched text and a descriptive message;
     # if there is no match, print an error message.
     if match == None:
          print 'No temperature reading at URL:',url
     else:
          print 'In '+name+', it is now',match.group(1),'degrees.'

# END of function 'read_temperature'

# Launch a separate thread for each request.
for name,url in citydata:
     # Only keyword arguments (of the form 'name=value') may
     # be used with the 'Thread' constructor.  The 'args'
     # specifies arguments to be passed to the 'target'.
     thread = threading.Thread(target=read_temperature,
                               args=(name,url,MAX_PAGE_LEN))
     thread.start()

# The 'threading' package will wait until all the child threads
# (except any that are explicitly labelled as 'daemon threads')
# have shut down before exiting the program.

Here is the output produced by the client.

Example 4. Thread-per-request client output

In London, it is now 46 degrees.
In New York, it is now 37.9 degrees.
In Tokyo, it is now 48 degrees.

The output of this simple example already illustrates the subtleties of adding concurrency to your program. The replies are not in the same order as the requests. Each thread in the example runs independently of the others, so a thread started later might well finish earlier. After all, network I/O is unpredictable, so a slow request can finish long after a fast one, even if the slow request had a head start.

Thread Pool

Another common threading model is the thread pool. A basic thread pool starts a fixed number of threads during initialization and does not shut them down until the program exits. This technique is often more efficient for servers than the thread-per-request model. Thread pools eliminate the overhead of creating and destroying threads in long-running applications which must continuously process requests. In addition, thread pools prevent the situation where a sudden burst of activity causes too many threads to be started, thus exhausting the operating system's resources.

Here is the client from the thread-per-request example, modified to use a thread pool.

Example 5. A thread pool client

import urllib     # Library for retrieving files using a URL.
import re         # Library for finding patterns in text.
import threading  # High-level thread library.
import Queue      # A thread-safe queue implementation.

# Three NOAA web pages, showing current conditions in New York,
# London and Tokyo, respectively.
citydata = (('New York','http://weather.noaa.gov/weather/current/KNYC.html'),
            ('London',  'http://weather.noaa.gov/weather/current/EGLC.html'),
            ('Tokyo',   'http://weather.noaa.gov/weather/current/RJTT.html'))

# The maximum amount of data we are prepared to read, from any single page.
MAX_PAGE_LEN = 20000

# The total number of threads that we will launch for our thread pool.
NTHREADS = 2

# Function to be run by each thread in the pool.  When
# the function returns, the corresponding thread terminates.
def read_temperature(max,inpque,outqueue):
     # Get a city name and URL from the input queue.
     # The thread will wait until input is available.
     name,url = inpque.get()

     # The thread continues to run, until an empty string for the city
     # name is received.  This allows the thread pool to be shut down
     # cleanly.  In addition, Python does not support the killing of
     # threads from outside, so the only way to terminate a thread
     # is to somehow signal it to stop.
     while not (name == ''):
          # Open and read the web page; catch any I/O errors.
          try:
               webpage = urllib.urlopen(url).read(max)
          except IOError, e:
               # An I/O error occurred; place the error message in
               # the output queue.
               outqueue.put('I/O Error when reading URL '+url
                            +' :\n'+str(e.strerror))
          else:
               # Pattern which matches text like '66.9 F'.  The last
               # argument ('re.S') is a flag, which effectively causes
               # newlines to be treated as ordinary characters.
               match = re.search(r'(-?\d+(?:\.\d+)?) F',webpage,re.S)

               # Output the matched text and a descriptive message;
               # if there is no match, output an error message instead.
               if match == None:
                    outqueue.put('No temperature reading at URL: '+url)
               else:
                    outqueue.put('In '+name+', it is now '
                                 +match.group(1)+' degrees.')

          # Get the next name and URL pair.  Will wait if necessary.
          name,url = inpque.get()

     # If we get here, an empty city name has been received.  The last
     # action of the thread is to place the 'None' object in the output
     # queue, to indicate that it has stopped.
     outqueue.put(None)

# END of function 'read_temperature'

# Create the input and output queues.
# Their size is not limited in this example.
inputs  = Queue.Queue(0)
results = Queue.Queue(0)
thread_pool = []         # No threads are currently in the pool.

# Start the thread pool.
for ii in range(NTHREADS):
     # Only keyword arguments (of the form 'name=value') may
     # be used with the 'Thread' constructor.  The 'args'
     # specifies arguments to be passed to the 'target'.
     thread = threading.Thread(target=read_temperature,
                               args=(MAX_PAGE_LEN,inputs,results))
     thread.start()               # Start the thread.
     thread_pool.append(thread)   # Add it to our list of threads.

# Issue requests, by placing them in the input queue.
for item in citydata:
     inputs.put(item)

# Read results from the output queue.  Because requests are processed
# concurrently, the results may come back in a different order from the
# requests.
for ii in range(len(citydata)):
     print results.get()

# Request shut down of the thread pool, by issuing as many empty city
# name requests as there are threads.
for thread in thread_pool:
     inputs.put(('',''))

# The 'threading' package will wait until all the child threads
# (except any that are explicitly labelled as 'daemon threads')
# have shut down before exiting the program.

The output is the same as before. Of course, the order in which the results are returned can change in every run, as previously noted.

A pair of queues are used to exchange data with the thread pool. Just like a lineup at the bank, the most common type of queue operates on a first come, first serve basis. The customers (or pending operations) wait in the queue until a teller (or thread) becomes free. Then, the teller will service the first customer in line.

In the example, all threads wait on the input queue right after being started. The get method of the Queue class is atomic, or indivisible. This ensures that any request will be assigned to only one thread. The assignment of requests to threads continues until there are no more requests, or all the threads in the pool are busy. If there are no more requests, then the remaining threads will continue to wait on the input queue. On the other hand, if there are more requests than threads, then the requests will accumulate in the input queue.

Unless a catastrophic error permanently blocks a thread from running, it will eventually complete its task, and will return to waiting on the input queue. In the example, the last step of handling a request is to place the result in the output queue. We also deliberately start only two threads for our thread pool, so that one of them will have to handle an additional request (since there are three cities for which we need to obtain data). This illustrates the "recycling" of threads in the thread pool model.

In addition to the threads that we start explicitly in our program, the application always has one additional thread, called the main thread. In the example, we use the main thread to read the output queue and print the results. Only one thread in the program ever uses the print statement.

Using a queue and a dedicated task to manage a resource is an alternative technique to sharing that resource with a lock. If the Python print were not thread safe, the current example would still work, but the previous one (which called print without locking from multiple threads) would not.

One important aspect of queues is worth mentioning. As you have probably experienced, lineups at the bank can get very long at busy times, when there are too few tellers working, or when several slow customers arrive at once. In fact, queue lengths grow exponentially as the rate at which new items are added approach the system's limit on being able to remove and process them.

For example, if you use queues in a server to process requests, you may find that the number of items waiting for service suddenly explodes with little warning. In a production system, it may be worthwhile to limit the maximum queue length (a feature that the standard Python Queue module supports) in order to guard against an uncontrolled crash as resources are exhausted. It is also highly useful to keep statistics on the number of waiting requests. These statistics -- combined with the results from stress testing your application -- can provide an early warning of future trouble. The Further Reading appendix refers to additional material regarding queues.

Pages: 1, 2, 3, 4

Next Pagearrow





Sponsored by: