source: extensions/gsdl-video/trunk/installed/cmdline/lib/ruby/1.8/rinda/ring.rb@ 18425

Last change on this file since 18425 was 18425, checked in by davidb, 15 years ago

Video extension to Greenstone

File size: 5.8 KB
Line 
1#
2# Note: Rinda::Ring API is unstable.
3#
4require 'drb/drb'
5require 'rinda/rinda'
6require 'thread'
7
8module Rinda
9
10 ##
11 # The default port Ring discovery will use.
12
13 Ring_PORT = 7647
14
15 ##
16 # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
17 # Service location uses the following steps:
18 #
19 # 1. A RingServer begins listening on the broadcast UDP address.
20 # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
21 # listen for a reply.
22 # 3. The RingServer recieves the UDP packet and connects back to the
23 # provided DRb URI with the DRb service.
24
25 class RingServer
26
27 include DRbUndumped
28
29 ##
30 # Advertises +ts+ on the UDP broadcast address at +port+.
31
32 def initialize(ts, port=Ring_PORT)
33 @ts = ts
34 @soc = UDPSocket.open
35 @soc.bind('', port)
36 @w_service = write_service
37 @r_service = reply_service
38 end
39
40 ##
41 # Creates a thread that picks up UDP packets and passes them to do_write
42 # for decoding.
43
44 def write_service
45 Thread.new do
46 loop do
47 msg = @soc.recv(1024)
48 do_write(msg)
49 end
50 end
51 end
52
53 ##
54 # Extracts the response URI from +msg+ and adds it to TupleSpace where it
55 # will be picked up by +reply_service+ for notification.
56
57 def do_write(msg)
58 Thread.new do
59 begin
60 tuple, sec = Marshal.load(msg)
61 @ts.write(tuple, sec)
62 rescue
63 end
64 end
65 end
66
67 ##
68 # Creates a thread that notifies waiting clients from the TupleSpace.
69
70 def reply_service
71 Thread.new do
72 loop do
73 do_reply
74 end
75 end
76 end
77
78 ##
79 # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
80 # address of the local TupleSpace.
81
82 def do_reply
83 tuple = @ts.take([:lookup_ring, nil])
84 Thread.new { tuple[1].call(@ts) rescue nil}
85 rescue
86 end
87
88 end
89
90 ##
91 # RingFinger is used by RingServer clients to discover the RingServer's
92 # TupleSpace. Typically, all a client needs to do is call
93 # RingFinger.primary to retrieve the remote TupleSpace, which it can then
94 # begin using.
95
96 class RingFinger
97
98 @@broadcast_list = ['<broadcast>', 'localhost']
99
100 @@finger = nil
101
102 ##
103 # Creates a singleton RingFinger and looks for a RingServer. Returns the
104 # created RingFinger.
105
106 def self.finger
107 unless @@finger
108 @@finger = self.new
109 @@finger.lookup_ring_any
110 end
111 @@finger
112 end
113
114 ##
115 # Returns the first advertised TupleSpace.
116
117 def self.primary
118 finger.primary
119 end
120
121 ##
122 # Contains all discoverd TupleSpaces except for the primary.
123
124 def self.to_a
125 finger.to_a
126 end
127
128 ##
129 # The list of addresses where RingFinger will send query packets.
130
131 attr_accessor :broadcast_list
132
133 ##
134 # The port that RingFinger will send query packets to.
135
136 attr_accessor :port
137
138 ##
139 # Contain the first advertised TupleSpace after lookup_ring_any is called.
140
141 attr_accessor :primary
142
143 ##
144 # Creates a new RingFinger that will look for RingServers at +port+ on
145 # the addresses in +broadcast_list+.
146
147 def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
148 @broadcast_list = broadcast_list || ['localhost']
149 @port = port
150 @primary = nil
151 @rings = []
152 end
153
154 ##
155 # Contains all discovered TupleSpaces except for the primary.
156
157 def to_a
158 @rings
159 end
160
161 ##
162 # Iterates over all discovered TupleSpaces starting with the primary.
163
164 def each
165 lookup_ring_any unless @primary
166 return unless @primary
167 yield(@primary)
168 @rings.each { |x| yield(x) }
169 end
170
171 ##
172 # Looks up RingServers waiting +timeout+ seconds. RingServers will be
173 # given +block+ as a callback, which will be called with the remote
174 # TupleSpace.
175
176 def lookup_ring(timeout=5, &block)
177 return lookup_ring_any(timeout) unless block_given?
178
179 msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
180 @broadcast_list.each do |it|
181 soc = UDPSocket.open
182 begin
183 soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
184 soc.send(msg, 0, it, @port)
185 rescue
186 nil
187 ensure
188 soc.close
189 end
190 end
191 sleep(timeout)
192 end
193
194 ##
195 # Returns the first found remote TupleSpace. Any further recovered
196 # TupleSpaces can be found by calling +to_a+.
197
198 def lookup_ring_any(timeout=5)
199 queue = Queue.new
200
201 th = Thread.new do
202 self.lookup_ring(timeout) do |ts|
203 queue.push(ts)
204 end
205 queue.push(nil)
206 while it = queue.pop
207 @rings.push(it)
208 end
209 end
210
211 @primary = queue.pop
212 raise('RingNotFound') if @primary.nil?
213 @primary
214 end
215
216 end
217
218 ##
219 # RingProvider uses a RingServer advertised TupleSpace as a name service.
220 # TupleSpace clients can register themselves with the remote TupleSpace and
221 # look up other provided services via the remote TupleSpace.
222 #
223 # Services are registered with a tuple of the format [:name, klass,
224 # DRbObject, description].
225
226 class RingProvider
227
228 ##
229 # Creates a RingProvider that will provide a +klass+ service running on
230 # +front+, with a +description+. +renewer+ is optional.
231
232 def initialize(klass, front, desc, renewer = nil)
233 @tuple = [:name, klass, front, desc]
234 @renewer = renewer || Rinda::SimpleRenewer.new
235 end
236
237 ##
238 # Advertises this service on the primary remote TupleSpace.
239
240 def provide
241 ts = Rinda::RingFinger.primary
242 ts.write(@tuple, @renewer)
243 end
244
245 end
246
247end
248
249if __FILE__ == $0
250 DRb.start_service
251 case ARGV.shift
252 when 's'
253 require 'rinda/tuplespace'
254 ts = Rinda::TupleSpace.new
255 place = Rinda::RingServer.new(ts)
256 $stdin.gets
257 when 'w'
258 finger = Rinda::RingFinger.new(nil)
259 finger.lookup_ring do |ts|
260 p ts
261 ts.write([:hello, :world])
262 end
263 when 'r'
264 finger = Rinda::RingFinger.new(nil)
265 finger.lookup_ring do |ts|
266 p ts
267 p ts.take([nil, nil])
268 end
269 end
270end
271
Note: See TracBrowser for help on using the repository browser.