| In: |
lib/slave.rb
|
| Parent: | Object |
the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it’s parent via a LifeLine which is designed such that the slave cannot out-live it’s parent and become a zombie, even if the parent dies and early death, such as by ‘kill -9’. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg
class Server
def add_two n
n + 2
end
end
slave = Slave.new 'object' => Server.new
server = slave.object
p server.add_two(40) #=> 42
two other methods of providing server objects exist:
a) server = Server.new "this is called the parent" }
Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}
b) Slave.new{ Server.new "this is called only in the child" }
of the two ‘b’ is preferred.
| VERSION | = | '1.1.0' |
| DEFAULT_SOCKET_CREATION_ATTEMPTS | = | Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42) |
| env config | ||
| DEFAULT_DEBUG | = | (ENV['SLAVE_DEBUG'] ? true : false) |
| DEFAULT_THREADSAFE | = | (ENV['SLAVE_THREADSAFE'] ? true : false) |
| at_exit | [R] | |
| debug | [RW] | if this is true and you are running from a terminal information is printed on STDERR |
| debug | [R] | |
| dumped | [R] | |
| obj | [R] | attrs |
| object | [R] | |
| pid | [R] | |
| ppid | [R] | |
| psname | [R] | |
| shutdown | [R] | |
| socket | [R] | |
| socket_creation_attempts | [RW] | |
| socket_creation_attempts | [R] | |
| status | [R] | |
| threadsafe | [RW] | if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe yourself as this is required of any object acting as a drb server |
| uri | [R] |
get a default value
# File lib/slave.rb, line 77
77: def default key
78: #--{{{
79: send key
80: #--}}}
81: end
just fork with out silly warnings
# File lib/slave.rb, line 98
98: def fork &block
99: #--{{{
100: v = $VERBOSE
101: begin
102: $VERBOSE = nil
103: Process::fork &block
104: ensure
105: $VERBOSE = v
106: end
107: #--}}}
108: end
# File lib/slave.rb, line 83
83: def getopts opts
84: #--{{{
85: raise ArgumentError, opts.class unless
86: opts.respond_to?('has_key?') and opts.respond_to?('[]')
87:
88: lambda do |key, *defval|
89: defval = defval.shift
90: keys = [key, key.to_s, key.to_s.intern]
91: key = keys.detect{|k| opts.has_key? k } and break opts[key]
92: defval
93: end
94: #--}}}
95: end
sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the ‘object’/:object keyword
Slave.new :object => MyServer.new
or, preferably, using the block form
Slave.new{ MyServer.new }
when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child’s address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.
opts may contain the following keys, as either strings or symbols
object : specify the slave object. otherwise block value is used. socket_creation_attempts : specify how many attempts to create a unix domain socket will be made debug : turn on some logging to STDERR psname : specify the name that will appear in 'top' ($0) at_exit : specify a lambda to be called in the *parent* when the child dies dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
# File lib/slave.rb, line 321
321: def initialize opts = {}, &block
322: #--{{{
323: getopt = getopts opts
324:
325: @obj = getopt['object']
326: @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
327: @debug = getopt['debug'] || default('debug')
328: @psname = getopt['psname']
329: @at_exit = getopt['at_exit']
330: @dumped = getopt['dumped']
331: @threadsafe = getopt['threadsafe'] || default('threadsafe')
332:
333: raise ArgumentError, 'no slave object or slave object block provided!' if
334: @obj.nil? and block.nil?
335:
336: @shutdown = false
337: @waiter = @status = nil
338: @lifeline = LifeLine.new
339:
340: # weird syntax because dot/rdoc chokes on this!?!?
341: init_failure = lambda do |e|
342: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
343: o = Object.new
344: class << o
345: attr_accessor '__slave_object_failure__'
346: end
347: o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
348: @object = o
349: end
350:
351: #
352: # child
353: #
354: unless((@pid = Slave::fork))
355: e = nil
356: begin
357: Kernel.at_exit{ Kernel.exit! }
358: @lifeline.catch
359:
360: if @obj
361: @object = @obj
362: else
363: begin
364: @object = block.call
365: rescue Exception => e
366: init_failure[e]
367: end
368: end
369:
370: if block and @obj
371: begin
372: block[@obj]
373: rescue Exception => e
374: init_failure[e]
375: end
376: end
377:
378: $0 = (@psname ||= gen_psname(@object))
379:
380: unless @dumped or @object.respond_to?('__slave_object_failure__')
381: @object.extend DRbUndumped
382: end
383:
384: if @threadsafe
385: @object = ThreadSafe.new @object
386: end
387:
388: @ppid, @pid = Process::ppid, Process::pid
389: @socket = nil
390: @uri = nil
391:
392: tmpdir, basename = Dir::tmpdir, File::basename(@psname)
393:
394: @socket_creation_attempts.times do |attempt|
395: se = nil
396: begin
397: s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }")
398: u = "drbunix://#{ s }"
399: DRb::start_service u, @object
400: @socket = s
401: @uri = u
402: trace{ "child - socket <#{ @socket }>" }
403: trace{ "child - uri <#{ @uri }>" }
404: break
405: rescue Errno::EADDRINUSE => se
406: nil
407: end
408: end
409:
410: if @socket and @uri
411: trap('SIGUSR2') do
412: DBb::thread.kill rescue nil
413: FileUtils::rm_f @socket rescue nil
414: exit
415: end
416:
417: @lifeline.puts @socket
418: @lifeline.cling
419: else
420: @lifeline.release
421: warn "slave(#{ $$ }) could not create socket!"
422: exit
423: end
424: rescue Exception => e
425: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
426: ensure
427: status = e.respond_to?('status') ? e.status : 1
428: exit(status)
429: end
430: #
431: # parent
432: #
433: else
434: detach
435: @lifeline.throw
436:
437: buf = @lifeline.gets
438: raise "failed to find slave socket" if buf.nil? or buf.strip.empty?
439: @socket = buf.strip
440: trace{ "parent - socket <#{ @socket }>" }
441:
442: if @at_exit
443: @at_exit_thread = @lifeline.on_cut{
444: @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self)
445: }
446: end
447:
448: if @socket and File::exist? @socket
449: Kernel.at_exit{ FileUtils::rm_f @socket }
450: @uri = "drbunix://#{ socket }"
451: trace{ "parent - uri <#{ @uri }>" }
452: #
453: # starting drb on localhost avoids dns lookups!
454: #
455: DRb::start_service('druby://localhost:0', nil) unless DRb::thread
456: @object = DRbObject::new nil, @uri
457: if @object.respond_to? '__slave_object_failure__'
458: c, m, bt = Marshal.load @object.__slave_object_failure__
459: (e = c.new(m)).set_backtrace bt
460: trace{ %[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
461: raise e
462: end
463: @psname ||= gen_psname(@object)
464: else
465: raise "failed to find slave socket <#{ @socket }>"
466: end
467: end
468: #--}}}
469: end
a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg
object = Slave.object{ processor_intensive_object_built_in_child_process() }
eg.
the call can be made asynchronous via the ‘async’/:async keyword
thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }
# go on about your coding business then, later
object = thread.value
# File lib/slave.rb, line 589
589: def self.object opts = {}, &b
590: #--{{{
591: l = lambda{ begin; b.call; ensure; exit; end }
592:
593: async = opts.delete('async') || opts.delete(:async)
594:
595: opts['object'] = opts[:object] = l
596: opts['dumped'] = opts[:dumped] = true
597:
598: slave = Slave.new opts
599:
600: async ? Thread.new{ slave.object.call } : slave.object.call
601: #--}}}
602: end
# File lib/slave.rb, line 603
603: def self.object opts = {}, &b
604: #--{{{
605: async = opts.delete('async') || opts.delete(:async)
606:
607: opts['object'] = opts[:object] = lambda(&b)
608: opts['dumped'] = opts[:dumped] = true
609:
610: slave = Slave.new opts
611:
612: value = lambda do |slave|
613: begin
614: slave.object.call
615: ensure
616: slave.shutdown
617: end
618: end
619:
620: async ? Thread.new{ value[slave] } : value[slave]
621: #--}}}
622: end
see docs for Slave.default
# File lib/slave.rb, line 548
548: def default key
549: #--{{{
550: self.class.default key
551: #--}}}
552: end
starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status
# File lib/slave.rb, line 475
475: def detach
476: #--{{{
477: reap = lambda do |cid|
478: begin
479: @status = Process::waitpid2(cid).last
480: rescue Exception => e
481: m, c, b = e.message, e.class, e.backtrace.join("\n")
482: warn "#{ m } (#{ c })\n#{ b }" unless e.is_a? Errno::ECHILD
483: end
484: end
485:
486: Kernel.at_exit do
487: shutdown rescue nil
488: reap[@pid] rescue nil
489: end
490:
491: @waiter =
492: Thread.new do
493: begin
494: @status = Process::waitpid2(@pid).last
495: ensure
496: reap = lambda{|cid| 'no-op' }
497: end
498: end
499: #--}}}
500: end
generate a default name to appear in ps/top
# File lib/slave.rb, line 540
540: def gen_psname obj
541: #--{{{
542: "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%/\s+/,'_')
543: #--}}}
544: end
see docs for Slave.getopts
# File lib/slave.rb, line 556
556: def getopts opts
557: #--{{{
558: self.class.getopts opts
559: #--}}}
560: end
cuts the lifeline and kills the child process - give the key ‘quiet’ to ignore errors shutting down, including having already shutdown
# File lib/slave.rb, line 519
519: def shutdown opts = {}
520: #--{{{
521: quiet = getopts(opts)['quiet']
522: raise "already shutdown" if @shutdown unless quiet
523: begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end
524: begin; @lifeline.cut; rescue Exception; end
525: raise e if e unless quiet
526: @shutdown = true
527: #--}}}
528: end
true
# File lib/slave.rb, line 532
532: def shutdown?
533: #--{{{
534: @shutdown
535: #--}}}
536: end
debugging output - ENV[‘SLAVE_DEBUG’]=1 to enable
# File lib/slave.rb, line 564
564: def trace
565: #--{{{
566: if @debug
567: STDERR.puts yield
568: STDERR.flush
569: end
570: #--}}}
571: end
wait for slave to finish. if the keyword ‘non_block’=>true is given a thread is returned to do the waiting in an async fashion. eg
thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
# File lib/slave.rb, line 507
507: def wait opts = {}, &b
508: #--{{{
509: b ||= lambda{|exit_status|}
510: non_block = getopts(opts)['non_block']
511: non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
512: #--}}}
513: end