Browse Source

Introduce thread job framework.

This is a cleaner generalization of the functionality added
to network_proxy.py a few months ago, whereby jobs can be
added and removed from a thread's main loop.  This allows
us to cut down on unnecessary threads.

A follow-up patch will update the proxy to this framework.
283
Neil Booth 9 years ago
parent
commit
56654ec4e9
  1. 37
      lib/util.py

37
lib/util.py

@ -3,6 +3,7 @@ import platform
import shutil import shutil
from datetime import datetime from datetime import datetime
from decimal import Decimal from decimal import Decimal
import traceback
import urlparse import urlparse
import urllib import urllib
import threading import threading
@ -24,6 +25,20 @@ class MyEncoder(json.JSONEncoder):
return obj.as_dict() return obj.as_dict()
return super(MyEncoder, self).default(obj) return super(MyEncoder, self).default(obj)
class ThreadJob:
"""A job that is run periodically from a thread's main loop. run() is
called from that thread's context.
"""
def print_error(self, *msg):
print_error("[%s]" % self.__class__.__name__, *msg)
def print_msg(self, *msg):
print_msg("[%s]" % self.__class__.__name__, *msg)
def run(self):
"""Called periodically from the thread"""
pass
class DaemonThread(threading.Thread): class DaemonThread(threading.Thread):
""" daemon thread that terminates cleanly """ """ daemon thread that terminates cleanly """
@ -33,6 +48,27 @@ class DaemonThread(threading.Thread):
self.parent_thread = threading.currentThread() self.parent_thread = threading.currentThread()
self.running = False self.running = False
self.running_lock = threading.Lock() self.running_lock = threading.Lock()
self.job_lock = threading.Lock()
self.jobs = []
def add_job(self, job):
with self.job_lock:
self.jobs.append(job)
def run_jobs(self):
# Don't let a throwing job disrupt the thread, future runs of
# itself, or other jobs. This is useful protection against
# malformed or malicious server responses
with self.job_lock:
for job in self.jobs:
try:
job.run()
except:
traceback.print_exc(file=sys.stderr)
def remove_job(self, job):
with self.job_lock:
self.jobs.remove(job)
def start(self): def start(self):
with self.running_lock: with self.running_lock:
@ -337,7 +373,6 @@ import socket
import errno import errno
import json import json
import ssl import ssl
import traceback
import time import time
class SocketPipe: class SocketPipe:

Loading…
Cancel
Save