Wednesday, October 28, 2009

Twisted Reactor w/XML-RPC, POST, GET

Rewriting our Berkeley XMLDB services on Twisted, I decided to implement some couch db type interfaces like responding to post and get requests for DB actions. Oh, and don't forget the already in use xmlrpc interfaces to stay backwards compatible! It was surprisingly simple after staring at the docs for a day or two, so I'm posting a piece of the code in hopes that the example makes it easier for people new to twisted. Below you'll find an example which not only covers the post/get/xmlrpc thing, but also has threadpools, deferreds, xmlrpc kwargs, and a reactor example. There are some pieces missing since the server is actually much more complicated but hopefully you get the idea. Bon appetite!
#! /usr/bin/python
import os
import atexit
import signal
import sys
from xxx.services.utilities import write_pid, exit_function, handle_sigterm
from twisted.web import xmlrpc, server, resource
from twisted.internet import defer
from twisted.python import threadpool
from twisted.internet import reactor
'''
Configure logging
logging is a blocking operation, which violates twisted principles
so please use the logging module, for warnings/errors that should be
seen by everyone. DB queries et al, should go to the twisted logging
module, since theirs is non-blocking.
http://twistedmatrix.com/trac/wiki/TwistedLogging
'''
from twisted.python import log
'''
We are using two different versions of twisted between machines -
so lets see if we can accommodate both easily
'''
TWISTED_ABOVE_8_1 = False
from twisted import version as tversion
if tversion.minor >= 2:
TWISTED_ABOVE_8_1 = True
class TXmldb(xmlrpc.XMLRPC):
isLeaf = True
def __init__(self,):
xmlrpc.XMLRPC.__init__(self)
'''
Set up a thread pool
'''
self.threadpool = threadpool.ThreadPool(config.MIN_THREADS,
config.MAX_THREADS)
self.threadpool.start()
# support for handling rest requests as well
def render_GET(self, request):
func = request.path[1:] # omit leading slash
if func == 'get':
docName = request.args['howMuchHotness'][0]
defer.maybeDeferred(self.__get, howMuchHotness).addCallbacks(
self.finishup,
errback=self.error,
callbackArgs=(request,))
else:
return "No comprende senor!"
return server.NOT_DONE_YET
def render_POST(self, request):
request.setHeader("Connection", "Keep-Alive")
# if this is an xmlrpc call, there will be no args
# since it will be all marshalled up into a content
# body. So, checking for arg length should tell us
# accurately if this is xmlrpc or not
if not len(request.args):
return xmlrpc.XMLRPC.render_POST(self, request)
func = request.path[1:] # omit leading slash
# otherwise handle this like a post
data = request.args['data'][0]
if func == 'add':
defer.maybeDeferred(self.__add, data).addCallbacks(self.finishup,
errback=self.error,
callbackArgs=(request,))
else:
return "No comprende senor!"
return server.NOT_DONE_YET
def finishup(self, result, request):
# post and get only take string results
if result == True:
result = "1"
if result in [False, None]:
result = ""
request.write(result)
request.finish()
'''
---------------
XMLRPC INTERFACES
Note: In the past, we called this with positional arguments
but we also want support for passing a dictionary of keyword values
theoretically the argument orderd one gets phased out but, I
can see value in having both. the 'k' added to the suffix
stands for "Kwargs"
---------------
'''
def xmlrpc_get(self, howMuchHotness):
return self._deferToThread('__get', howMuchHotness)
def xmlrpc_getk(self, kwargs):
return self._deferToThread('__get', **kwargs)
def __get(self, howMuchHotness):
return "did something %s"%howMuch
def _deferToThread(self, f, *args, **kwargs):
if TWISTED_ABOVE_8_1:
return threads.deferToThreadPool(reactor, self.threadpool, f, *args, **kwargs)
else:
d = defer.Deferred()
self.threadpool.callInThread(threads._putResultInDeferred, d, f, args, kwargs)
return d
def __del__(self):
self.threadpool.stop()
def start(port, schema=None):
port = int(port)
log.msg("Initializing txmldb on port %s ..." % (port,) )
r = TXmldb()
reactor.listenTCP(port, server.Site(r))
reactor.run()
return reactor
def deamonize(port=7080, logFile="/var/log/txmldb.log", pidFile="/var/tmp/txmldb.pid"):
fp = open(logFile, 'a+b')
log.startLogging(fp)
try:
pid = os.fork()
if pid > 0:
# Exit first parent
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# Decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# Do second fork
try:
pid = os.fork()
if pid > 0:
# Exit from second parent
write_pid(pid, pidFile)
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
atexit.register(exit_function,pidFile)
# Start the daemon main loop
start(port)
if __name__ == "__main__":
import getopt
args = sys.argv[1:]
port=7080
logFile=None
pidFile=None
try:
opts, args = getopt.getopt(args, "p:l:P:d")
except getopt.GetoptError, e:
print "%s. options are -l (log file location) -P (pid file location) -d (detach) and -p (port)"%e
sys.exit(0)
detach = False
for opt, value in opts:
if opt == "-p":
port = value
elif opt == "-l":
logFile = value
elif opt == "-P":
pidFile = value
elif opt == "-d":
detach = True
if detach:
deamonize(port, logFile, pidFile)
else:
start(port)
view raw gistfile1.pyw hosted with ❤ by GitHub