source: extensions/gsdl-video/trunk/installed/cmdline/lib/ruby/1.8/rinda/tuplespace.rb@ 18425

Last change on this file since 18425 was 18425, checked in by davidb, 15 years ago

Video extension to Greenstone

File size: 12.8 KB
Line 
1require 'monitor'
2require 'thread'
3require 'drb/drb'
4require 'rinda/rinda'
5
6module Rinda
7
8 ##
9 # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
10 # together with expiry and cancellation data.
11
12 class TupleEntry
13
14 include DRbUndumped
15
16 attr_accessor :expires
17
18 ##
19 # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
20 # time +sec+.
21 #
22 # A renewer must implement the +renew+ method which returns a Numeric,
23 # nil, or true to indicate when the tuple has expired.
24
25 def initialize(ary, sec=nil)
26 @cancel = false
27 @expires = nil
28 @tuple = make_tuple(ary)
29 @renewer = nil
30 renew(sec)
31 end
32
33 ##
34 # Marks this TupleEntry as canceled.
35
36 def cancel
37 @cancel = true
38 end
39
40 ##
41 # A TupleEntry is dead when it is canceled or expired.
42
43 def alive?
44 !canceled? && !expired?
45 end
46
47 ##
48 # Return the object which makes up the tuple itself: the Array
49 # or Hash.
50
51 def value; @tuple.value; end
52
53 ##
54 # Returns the canceled status.
55
56 def canceled?; @cancel; end
57
58 ##
59 # Has this tuple expired? (true/false).
60 #
61 # A tuple has expired when its expiry timer based on the +sec+ argument to
62 # #initialize runs out.
63
64 def expired?
65 return true unless @expires
66 return false if @expires > Time.now
67 return true if @renewer.nil?
68 renew(@renewer)
69 return true unless @expires
70 return @expires < Time.now
71 end
72
73 ##
74 # Reset the expiry time according to +sec_or_renewer+.
75 #
76 # +nil+:: it is set to expire in the far future.
77 # +false+:: it has expired.
78 # Numeric:: it will expire in that many seconds.
79 #
80 # Otherwise the argument refers to some kind of renewer object
81 # which will reset its expiry time.
82
83 def renew(sec_or_renewer)
84 sec, @renewer = get_renewer(sec_or_renewer)
85 @expires = make_expires(sec)
86 end
87
88 ##
89 # Returns an expiry Time based on +sec+ which can be one of:
90 # Numeric:: +sec+ seconds into the future
91 # +true+:: the expiry time is the start of 1970 (i.e. expired)
92 # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
93 # UNIX clocks will die)
94
95 def make_expires(sec=nil)
96 case sec
97 when Numeric
98 Time.now + sec
99 when true
100 Time.at(1)
101 when nil
102 Time.at(2**31-1)
103 end
104 end
105
106 ##
107 # Retrieves +key+ from the tuple.
108
109 def [](key)
110 @tuple[key]
111 end
112
113 ##
114 # Fetches +key+ from the tuple.
115
116 def fetch(key)
117 @tuple.fetch(key)
118 end
119
120 ##
121 # The size of the tuple.
122
123 def size
124 @tuple.size
125 end
126
127 ##
128 # Creates a Rinda::Tuple for +ary+.
129
130 def make_tuple(ary)
131 Rinda::Tuple.new(ary)
132 end
133
134 private
135
136 ##
137 # Returns a valid argument to make_expires and the renewer or nil.
138 #
139 # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
140 # renewer). Otherwise it returns an expiry value from calling +it.renew+
141 # and the renewer.
142
143 def get_renewer(it)
144 case it
145 when Numeric, true, nil
146 return it, nil
147 else
148 begin
149 return it.renew, it
150 rescue Exception
151 return it, nil
152 end
153 end
154 end
155
156 end
157
158 ##
159 # A TemplateEntry is a Template together with expiry and cancellation data.
160
161 class TemplateEntry < TupleEntry
162 ##
163 # Matches this TemplateEntry against +tuple+. See Template#match for
164 # details on how a Template matches a Tuple.
165
166 def match(tuple)
167 @tuple.match(tuple)
168 end
169
170 alias === match
171
172 def make_tuple(ary) # :nodoc:
173 Rinda::Template.new(ary)
174 end
175
176 end
177
178 ##
179 # <i>Documentation?</i>
180
181 class WaitTemplateEntry < TemplateEntry
182
183 attr_reader :found
184
185 def initialize(place, ary, expires=nil)
186 super(ary, expires)
187 @place = place
188 @cond = place.new_cond
189 @found = nil
190 end
191
192 def cancel
193 super
194 signal
195 end
196
197 def wait
198 @cond.wait
199 end
200
201 def read(tuple)
202 @found = tuple
203 signal
204 end
205
206 def signal
207 @place.synchronize do
208 @cond.signal
209 end
210 end
211
212 end
213
214 ##
215 # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
216 # TupleSpace changes. You may receive either your subscribed event or the
217 # 'close' event when iterating over notifications.
218 #
219 # See TupleSpace#notify_event for valid notification types.
220 #
221 # == Example
222 #
223 # ts = Rinda::TupleSpace.new
224 # observer = ts.notify 'write', [nil]
225 #
226 # Thread.start do
227 # observer.each { |t| p t }
228 # end
229 #
230 # 3.times { |i| ts.write [i] }
231 #
232 # Outputs:
233 #
234 # ['write', [0]]
235 # ['write', [1]]
236 # ['write', [2]]
237
238 class NotifyTemplateEntry < TemplateEntry
239
240 ##
241 # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
242 # match +tuple+.
243
244 def initialize(place, event, tuple, expires=nil)
245 ary = [event, Rinda::Template.new(tuple)]
246 super(ary, expires)
247 @queue = Queue.new
248 @done = false
249 end
250
251 ##
252 # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
253
254 def notify(ev)
255 @queue.push(ev)
256 end
257
258 ##
259 # Retrieves a notification. Raises RequestExpiredError when this
260 # NotifyTemplateEntry expires.
261
262 def pop
263 raise RequestExpiredError if @done
264 it = @queue.pop
265 @done = true if it[0] == 'close'
266 return it
267 end
268
269 ##
270 # Yields event/tuple pairs until this NotifyTemplateEntry expires.
271
272 def each # :yields: event, tuple
273 while !@done
274 it = pop
275 yield(it)
276 end
277 rescue
278 ensure
279 cancel
280 end
281
282 end
283
284 ##
285 # TupleBag is an unordered collection of tuples. It is the basis
286 # of Tuplespace.
287
288 class TupleBag
289
290 def initialize # :nodoc:
291 @hash = {}
292 end
293
294 ##
295 # +true+ if the TupleBag to see if it has any expired entries.
296
297 def has_expires?
298 @hash.each do |k, v|
299 v.each do |tuple|
300 return true if tuple.expires
301 end
302 end
303 false
304 end
305
306 ##
307 # Add +ary+ to the TupleBag.
308
309 def push(ary)
310 size = ary.size
311 @hash[size] ||= []
312 @hash[size].push(ary)
313 end
314
315 ##
316 # Removes +ary+ from the TupleBag.
317
318 def delete(ary)
319 size = ary.size
320 @hash.fetch(size, []).delete(ary)
321 end
322
323 ##
324 # Finds all live tuples that match +template+.
325
326 def find_all(template)
327 @hash.fetch(template.size, []).find_all do |tuple|
328 tuple.alive? && template.match(tuple)
329 end
330 end
331
332 ##
333 # Finds a live tuple that matches +template+.
334
335 def find(template)
336 @hash.fetch(template.size, []).find do |tuple|
337 tuple.alive? && template.match(tuple)
338 end
339 end
340
341 ##
342 # Finds all tuples in the TupleBag which when treated as templates, match
343 # +tuple+ and are alive.
344
345 def find_all_template(tuple)
346 @hash.fetch(tuple.size, []).find_all do |template|
347 template.alive? && template.match(tuple)
348 end
349 end
350
351 ##
352 # Delete tuples which dead tuples from the TupleBag, returning the deleted
353 # tuples.
354
355 def delete_unless_alive
356 deleted = []
357 @hash.keys.each do |size|
358 ary = []
359 @hash[size].each do |tuple|
360 if tuple.alive?
361 ary.push(tuple)
362 else
363 deleted.push(tuple)
364 end
365 end
366 @hash[size] = ary
367 end
368 deleted
369 end
370
371 end
372
373 ##
374 # The Tuplespace manages access to the tuples it contains,
375 # ensuring mutual exclusion requirements are met.
376 #
377 # The +sec+ option for the write, take, move, read and notify methods may
378 # either be a number of seconds or a Renewer object.
379
380 class TupleSpace
381
382 include DRbUndumped
383 include MonitorMixin
384
385 ##
386 # Creates a new TupleSpace. +period+ is used to control how often to look
387 # for dead tuples after modifications to the TupleSpace.
388 #
389 # If no dead tuples are found +period+ seconds after the last
390 # modification, the TupleSpace will stop looking for dead tuples.
391
392 def initialize(period=60)
393 super()
394 @bag = TupleBag.new
395 @read_waiter = TupleBag.new
396 @take_waiter = TupleBag.new
397 @notify_waiter = TupleBag.new
398 @period = period
399 @keeper = nil
400 end
401
402 ##
403 # Adds +tuple+
404
405 def write(tuple, sec=nil)
406 entry = TupleEntry.new(tuple, sec)
407 start_keeper
408 synchronize do
409 if entry.expired?
410 @read_waiter.find_all_template(entry).each do |template|
411 template.read(tuple)
412 end
413 notify_event('write', entry.value)
414 notify_event('delete', entry.value)
415 else
416 @bag.push(entry)
417 @read_waiter.find_all_template(entry).each do |template|
418 template.read(tuple)
419 end
420 @take_waiter.find_all_template(entry).each do |template|
421 template.signal
422 end
423 notify_event('write', entry.value)
424 end
425 end
426 entry
427 end
428
429 ##
430 # Removes +tuple+
431
432 def take(tuple, sec=nil, &block)
433 move(nil, tuple, sec, &block)
434 end
435
436 ##
437 # Moves +tuple+ to +port+.
438
439 def move(port, tuple, sec=nil)
440 template = WaitTemplateEntry.new(self, tuple, sec)
441 yield(template) if block_given?
442 start_keeper
443 synchronize do
444 entry = @bag.find(template)
445 if entry
446 port.push(entry.value) if port
447 @bag.delete(entry)
448 notify_event('take', entry.value)
449 return entry.value
450 end
451 raise RequestExpiredError if template.expired?
452
453 begin
454 @take_waiter.push(template)
455 while true
456 raise RequestCanceledError if template.canceled?
457 raise RequestExpiredError if template.expired?
458 entry = @bag.find(template)
459 if entry
460 port.push(entry.value) if port
461 @bag.delete(entry)
462 notify_event('take', entry.value)
463 return entry.value
464 end
465 template.wait
466 end
467 ensure
468 @take_waiter.delete(template)
469 end
470 end
471 end
472
473 ##
474 # Reads +tuple+, but does not remove it.
475
476 def read(tuple, sec=nil)
477 template = WaitTemplateEntry.new(self, tuple, sec)
478 yield(template) if block_given?
479 start_keeper
480 synchronize do
481 entry = @bag.find(template)
482 return entry.value if entry
483 raise RequestExpiredError if template.expired?
484
485 begin
486 @read_waiter.push(template)
487 template.wait
488 raise RequestCanceledError if template.canceled?
489 raise RequestExpiredError if template.expired?
490 return template.found
491 ensure
492 @read_waiter.delete(template)
493 end
494 end
495 end
496
497 ##
498 # Returns all tuples matching +tuple+. Does not remove the found tuples.
499
500 def read_all(tuple)
501 template = WaitTemplateEntry.new(self, tuple, nil)
502 synchronize do
503 entry = @bag.find_all(template)
504 entry.collect do |e|
505 e.value
506 end
507 end
508 end
509
510 ##
511 # Registers for notifications of +event+. Returns a NotifyTemplateEntry.
512 # See NotifyTemplateEntry for examples of how to listen for notifications.
513 #
514 # +event+ can be:
515 # 'write':: A tuple was added
516 # 'take':: A tuple was taken or moved
517 # 'delete':: A tuple was lost after being overwritten or expiring
518 #
519 # The TupleSpace will also notify you of the 'close' event when the
520 # NotifyTemplateEntry has expired.
521
522 def notify(event, tuple, sec=nil)
523 template = NotifyTemplateEntry.new(self, event, tuple, sec)
524 synchronize do
525 @notify_waiter.push(template)
526 end
527 template
528 end
529
530 private
531
532 ##
533 # Removes dead tuples.
534
535 def keep_clean
536 synchronize do
537 @read_waiter.delete_unless_alive.each do |e|
538 e.signal
539 end
540 @take_waiter.delete_unless_alive.each do |e|
541 e.signal
542 end
543 @notify_waiter.delete_unless_alive.each do |e|
544 e.notify(['close'])
545 end
546 @bag.delete_unless_alive.each do |e|
547 notify_event('delete', e.value)
548 end
549 end
550 end
551
552 ##
553 # Notifies all registered listeners for +event+ of a status change of
554 # +tuple+.
555
556 def notify_event(event, tuple)
557 ev = [event, tuple]
558 @notify_waiter.find_all_template(ev).each do |template|
559 template.notify(ev)
560 end
561 end
562
563 ##
564 # Creates a thread that scans the tuplespace for expired tuples.
565
566 def start_keeper
567 return if @keeper && @keeper.alive?
568 @keeper = Thread.new do
569 while need_keeper?
570 keep_clean
571 sleep(@period)
572 end
573 end
574 end
575
576 ##
577 # Checks the tuplespace to see if it needs cleaning.
578
579 def need_keeper?
580 return true if @bag.has_expires?
581 return true if @read_waiter.has_expires?
582 return true if @take_waiter.has_expires?
583 return true if @notify_waiter.has_expires?
584 end
585
586 end
587
588end
589
Note: See TracBrowser for help on using the repository browser.