Using Interruption (Portable Thread Cancellation)


Introduction
Why Do I Need Interruption?
What About Just Killing a Thread?
What Is Interruption?
How Can I Use Interruption?
Why Does Thread Implement Cancelable?
Interrupting Other Non-Blocking Operations?

Introduction

This is brief explaination of what interruption is and how can be used in ZThreads. It has been written somewhat quickly, but hopefully will give you some information to see how you can use this mechanism effectively.

Why Do I Need Interruption?

Sooner or later, when writting a multi-threaded program, a programmer will encounter a situation when it would be very convient to let a thread know that its time to change its course of action. There are a multitude of situations where this is desirable. For example, a user may want to exit a program; regardless of what that program is doing, you'd like to shut it down safely and quickly. Or a user may decide to hit a cancel button, meaning a program needs tell a thread it doesn't need to continue doing what it's doing anymore. Other times, a problem can be broken down and solved by several threads concurrently; but if one of these threads encounters an error is usually useful to stop the other threads since all of the result would be needed to assemble a meaningful solution.

Several methods currently exist that accomplish this task. However, none of these methods lend themselves particularlly well to an object-oriented design; and more importantly none of them are very portable.

What About Just Killing a Thread?

Killing a thread is almost garunteed to leave the program in an inconsistent and dangerous state. If a thread that has allocated memory, acquired resources or is holding locks is suddenly removed there are immediate problems involving memory leaks, resource starvation and deadlock [Fleiner et al. 96]. Because of these problems, ZThreads provides no mechanism to simply stop a thread dead in its tracks. Instead, it encourages designing to avoid the need to do so.

What Is Interruption?

Interruption is a completely portable, exception based C++ mechanism that can be implemented on top of any thread model. It provides a safe mechanism to notify threads of exception conditions so that the thread can handle it in the best way possible.

Interruption is a concept that is loosely based on Java's interruption mechanism. [Lea 97] Each thread has a internal status associated with it. A thread begins it life without its interrupted status being set. The Thread::interrupt() function is used to set the interrupted status of a thread. A thread in this status will recieve some notifcation in one of two ways:

  1. A thread may poll its own status using the Thread::interrupted() function. This will return false if the thread has not been interrupted or true if it has been. It will always clear threads interrupted status.
  2. A thread may attempt a blocking operation on any synchronization object; typically functions extending Lockable::acquire(), Waitable::wait() are places a thread might block. If a thread is in interrupted status, this will fail. An Interrupted_Exception will be thrown and the interrupted status of that thread is cleared.

This helps ensures that a thread is going to recieve notification as soon as possible, at a safe location. Interruption will not occur at unexpected places, and it will not interrupt affect functions outside the ZThreads library. It provides a communication channel, allowing a message to be sent from one thread to the task being run by another. Below, you will see how this can be used to create extremely flexible behavior that is not as easy to recreate when relying only on a platforms native support.

How Do I Use Interruption?

Interruption is somewhat open ended by design, but always serves the same purpose. That purpose is to interrupt the task being executed by a thread so that it can change its course of action. Usually, the new course of action is going to be quickly cleanup and exit, but it doesn't have to be. Nothing is forced. This allows the task to respond to interruption in the best way possible. Sometimes this involves just stopping right away and returning. Other times, it may try something different whatever you're waiting for now is taking too long. Its up to the programmer to decide on these details. The is the source of what makes interruption so flexible.

Probably the most common, way to use interruption is to create a Thread that will respond to the request by stopping and exiting. This example demonstrates,


class WorkerThread : public Thread {

  Mutex lock;
  Condition workReady;
  
public:

  WorkerThread() : workReady(lock) { }
  
  virtual ~WorkerThread() throw() { }
  
  virtual void run() throw() {
  
    try {
      
      Guard<Mutex> g(lock);           
      workReady.wait();
          
      doSomeWork();
       
    } catch(Interrupted_Exception&) {
      // Thread interrupted, exit                                                               
      cout << "Interrupted!" << endl;
    }
    
  }

  void doSomeWork() {
    // ...                                                                                      
  }
    
};


int main() {

  try {
  
    // Start the worker thread
    WorkerThread t;
    t.start();
    
    // Let it run for a while
    
    
    // User hits a button, want to exit
    t.interrupt();
    t.join();
    
  } catch(Synchronization_Exception&) {
    // Error starting thread
  }
    
  return 0;
  
}

Task oriented frameworks can be very flexible. Interruption provides a mechanism for create tasks that can be responive to being interrupted. For example, a task can be written so that it can perform whatever cleanup is needed for it self, and can then propogate the exception to whatever ran the task. This code shows a simple worker thread, if any of its tasks are interrupted they perform whatever cleanup is needed and then propogate the exception to the thread running those tasks, causing it to exit.


class Task : public Runnable {
public:

  virtual void run() throw() {
  
    try {
    
      if(Thread::interrupted())
        throw Interrupted_Exception();

      Thread::sleep(1000);
      cout << "Task Complete!" << endl;
      
    } catch(Interrupted_Exception&) {

      // Interrupted, clean up & propogate the exception
      Thread::current().interrupt();
      cout << "Task Interrupted!" << endl;
 
    }

  }
  
};

class WorkerThread : public Thread {

  MonitoredQueue<Runnable*, FastMutex>& _q;
  
public:

  WorkerThread(MonitoredQueue<Runnable*, FastMutex>& q) : _q(q) { }
  
  virtual ~WorkerThread() throw() { }
  
  // ...
  
  virtual void run() throw() {
  
    Runnable* task = 0;
    
    try {
      
      while(!Thread::interrupted()) {
      
        task = _q.next();
        task->run();
          
        delete task;
        
      }
       
    } catch(Interrupted_Exception&) {
      // Interrupted waiting for tasks, update a log entry
    } catch(Cancellation_Exception&) {
      // Queue canceled, normal exit
    }
    
    if(task)
      delete task;
      
  }
  
};

#define NUM_TASKS 10

int main() {

  try {
  
    MonitoredQueue<Runnable*, FastMutex> q;

    for(int i=0; i<NUM_TASKS; i++)
      q.add( RunnablePtr(new Task) );
    
    // Start the worker thread
    WorkerThread t(q);
    t.start();
    
    // Let it run for a while   
    Thread::sleep(2000);

    // User hits a button, want to exit. 
    t.interrupt();
    t.join();
    
  } catch(Synchronization_Exception&) {
    // Error starting thread
  }
    
  return 0;
  
}

Another way to use interruption is to design a task class that interprets interruption to mean that the task should terminate, but not neccessisarily the thread.

class Task : public Runnable {
public:

  virtual void run() throw() {
  
    try {
    
      if(Thread::interrupted())
        throw Interrupted_Exception();

      Thread::sleep(1000);
      cout << "Task Complete!" << endl;
      
    } catch(Interrupted_Exception&) {

      // Interrupted, clean up & don't propogate the exception
      cout << "Task Interrupted!" << endl;
      // No exception propogated this time, thread keeps on going 
    }

  }
  
};

Interruption can also be used to create a way to specifically cancel induvidual tasks, and not neccessarily the entire thread. This can be done easily be extending CancelableTask instead of just Runnable.


class Task : public CancelableTask {
public:

  virtual void doRun() throw() {
  
    try {
    
      if(Thread::interrupted())
        throw Interrupted_Exception();

      Thread::sleep(1000);
      cout << "Task Complete!" << endl;
      
    } catch(Interrupted_Exception&) {

      // Interrupted, clean up & don't propogate the exception
      cout << "Task Interrupted!" << endl;
 
    }

  }
  
};


#define NUM_TASKS 10

int main() {

  try {
  
    ConcurrentExecutor<FastMutex> executor;
    vector<Handle<Task>*> pending;
    
    // Create the tasks and submit them 
    for(int i=0; i<NUM_TASKS; i++) {

      pending.push_back(new Handle<Task>(new Task));
      executor.execute(*pending.back());

    }

    // User waits a while...
    Thread::sleep(1000);

    // ...then decides to cancel a specific job
    Handle<Task>& job = *pending[5];
    job->cancel();

    // Request the executor shutdown
    executor.cancel();

    // Clean up the task handles
    for(vector<Handle<Task>*>::iterator i = pending.begin(); i != pending.end(); ++i) 
      delete *i;
    
  } catch(Synchronization_Exception&) {
    // Error starting thread
  }
    
  return 0;
  
}


Why Does Thread Implement Cancelable?

Using only interruption sometimes makes it difficult to determine whether a thread was interrupted by the task it was running or whether the thread itself was deliberately interrupted. In order to remove this ambiguity, the Cancelable interface is implemented.

The normal semantics for Thread::cancel() are to interrupt the thread and to place thread in canceled status. A thread with canceled status is not forced to shutdown, its not forced to do anything - think of it as a specialized form of interruption.

The Thread::canceled() to function will test the current threads canceled status, returning false if the thread was not canceled and returning true if was. Additionaly, this function will clear a threads interrupted status but it will never clear canceled status.

Executors are implemented so that Thread::interrupt() is a message to a task being run by an Executors thread to abort. Thread::cancel() being based on interruption will also send the message to the task to abort, but will also allow the executor to realize that the last interrupt was a request for the thread to exit.

Interrupting Other Non-Blocking Operations?

ZThreads provides a hooking mechanism that will allow platform specific behavior to be associated with an invocation of interrupt(). A special ThreadLocal object is provided that allows a program to interact with the ZThreads library to accomplish this. For POSIX systems, this exposes a buffer into which an address for a sigjmp can be placed.

// A POSIX hook implementing an interruptable accept()
sigjmp_buf timeout_jmp;
if(sigsetjmp(timeout_jmp, 1) != 0)
  throw IOInterrupted_Exception();

// Enable the hook
ZThread::Thread::interruptKey().set(&timeout_jmp);

// call accept()

// Disable the hook
ZThread::Thread::interruptKey().set(NULL);

For Win32 systems, this exposes a manual reset event handle that will be set each time an interrupt() occurs. This can be used in conjunction with functions like WaitForMutlipleObjects() or WSAEventSelect().

 
// A POSIX hook implementing an interruptable for accept()

HANDLE handles[2];

// Use the hook to get an event to add to the wait set.
handles[0] = ZThread::Thread::interruptKey().get();
handles[1] = hSocket;

switch(::WaitForMultipleObjects(2,handles,TRUE,INFINITE)) {
case WAIT_OBJECT_0:
  // ZThread::interrupt(), handle it
case WAIT_OBJECT_0 + 1:  
  // Socket ready
default:
  // ...
}

Many other methods exist for terminating other blocking operations early. For example, a socket can be closed to stop a connect() that is taking too long. Additionally, there are design patterns that can help make this easier to accomplish, reducing the number of places special attention is required (limiting it only to places where poll() or select() is used), by mixing threads with asynchronous I/O. [Schmidt 00]

The implementation of these concepts is freely available as part of the ZThreads library.

References

[Fleiner et al. 96] Fleiner, Claudio, Jerry Feldman, and David Stoutamire ``Killing Threads Considered Dangerous'',
POOMA '96 Conference, 1996.
[Beveridge 99] Jim E. Beveridge, Multithreading Applications in Win 32: The Complete Guide to Threads,
Addison-Wesley 1996, ISBN 0-201-44234-5 http://cseng.aw.com/book/0,3828,0201442345,00.html
[Butenhof 97] David R. Butenhof, Programming with POSIX Threads,
Addison-Wesley 1997, ISBN 0-201-63392-2 http://cseng.aw.com/book/0,3828,0201633922,00.html
[Lewis et al. 97] Bil Lewis, Daniel J. Berg, Multithreaded Programming with Pthreads
Prentice Hall, 1997, ISBN 0136807291 http://www.sun.com/books/catalog/Lewis2/
[Lea 99] Doug Lea, Concurrent Programming in Java: Design Principles and Patterns,
Addison-Wesley 1999, ISBN 0-201-31009-0 http://cseng.aw.com/book/0,3828,0201310090,00.html
[Schmidt 00] Doug C. Schmidt, Michael Stal, Hans Rohnert and Frank Buschmann, Pattern-Oriented Software Architecture Volume 2 - Patterns for Concurrent and Networked Objects, Wiley 2000, ISBN 0-471-60695-2 http://www.wiley.com/Corporate/Website/Objects/Products/0,9049,104671,00.html

 Copyright © 2000 - 2002,  Eric Crahen <crahen at cs dot buffalo dot edu> - All rights reserved.
Permission to use, copy, modify, distribute and sell this documentation for any purpose is hereby granted without fee, provided that the above copyright notice appear in all copies and that both that copyright notice and this permission notice appear in supporting documentation. Eric Crahen makes no representations about the suitability of this software for any purpose. It is provided "as is" without express or implied warranty.