source: other-projects/trunk/gs3-release-maker/apache-ant-1.6.5/src/main/org/apache/tools/ant/taskdefs/Parallel.java@ 14627

Last change on this file since 14627 was 14627, checked in by oranfry, 17 years ago

initial import of the gs3-release-maker

File size: 15.7 KB
Line 
1/*
2 * Copyright 2001-2004 The Apache Software Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17package org.apache.tools.ant.taskdefs;
18
19import java.lang.reflect.Method;
20import java.util.Enumeration;
21import java.util.Vector;
22import java.util.List;
23import java.util.ArrayList;
24import org.apache.tools.ant.BuildException;
25import org.apache.tools.ant.Location;
26import org.apache.tools.ant.Task;
27import org.apache.tools.ant.TaskContainer;
28import org.apache.tools.ant.util.StringUtils;
29
30/**
31 * Executes the contained tasks in separate threads, continuing
32 * once all are completed.
33 * <p>
34 * New behavior allows for the ant script to specify a maximum number of
35 * threads that will be executed in parallel. One should be very careful about
36 * using the <code>waitFor</code> task when specifying <code>threadCount</code>
37 * as it can cause deadlocks if the number of threads is too small or if one of
38 * the nested tasks fails to execute completely. The task selection algorithm
39 * will insure that the tasks listed before a task have started before that
40 * task is started, but it will not insure a successful completion of those
41 * tasks or that those tasks will finish first (i.e. it's a classic race
42 * condition).
43 * </p>
44 * @since Ant 1.4
45 *
46 * @ant.task category="control"
47 */
48public class Parallel extends Task
49 implements TaskContainer {
50
51 /** Class which holds a list of tasks to execute */
52 public static class TaskList implements TaskContainer {
53 /** Collection holding the nested tasks */
54 private List tasks = new ArrayList();
55
56 /**
57 * Add a nested task to execute parallel (asynchron).
58 * <p>
59 * @param nestedTask Nested task to be executed in parallel.
60 * must not be null.
61 */
62 public void addTask(Task nestedTask) {
63 tasks.add(nestedTask);
64 }
65 }
66
67 /** Collection holding the nested tasks */
68 private Vector nestedTasks = new Vector();
69
70 /** Semaphore to notify of completed threads */
71 private final Object semaphore = new Object();
72
73 /** Total number of threads to run */
74 private int numThreads = 0;
75
76 /** Total number of threads per processor to run. */
77 private int numThreadsPerProcessor = 0;
78
79 /** The timeout period in milliseconds */
80 private long timeout;
81
82 /** Indicates threads are still running and new threads can be issued */
83 private volatile boolean stillRunning;
84
85 /** Indicates that the execution timedout */
86 private boolean timedOut;
87
88 /**
89 * Indicates whether failure of any of the nested tasks should end
90 * execution
91 */
92 private boolean failOnAny;
93
94 /** The dameon task list if any */
95 private TaskList daemonTasks;
96
97 /** Accumulation of exceptions messages from all nested tasks */
98 private StringBuffer exceptionMessage;
99
100 /** Number of exceptions from nested tasks */
101 private int numExceptions = 0;
102
103 /** The first exception encountered */
104 private Throwable firstException;
105
106 /** The location of the first exception */
107 private Location firstLocation;
108
109 /**
110 * Add a group of daemon threads
111 * @param daemonTasks The tasks to be executed as daemon.
112 */
113 public void addDaemons(TaskList daemonTasks) {
114 if (this.daemonTasks != null) {
115 throw new BuildException("Only one daemon group is supported");
116 }
117 this.daemonTasks = daemonTasks;
118 }
119
120 /**
121 * Interval to poll for completed threads when threadCount or
122 * threadsPerProcessor is specified. Integer in milliseconds.; optional
123 *
124 * @param pollInterval New value of property pollInterval.
125 */
126 public void setPollInterval(int pollInterval) {
127 }
128
129 /**
130 * Control whether a failure in a nested task halts execution. Note that
131 * the task will complete but existing threads will continue to run - they
132 * are not stopped
133 *
134 * @param failOnAny if true any nested task failure causes parallel to
135 * complete.
136 */
137 public void setFailOnAny(boolean failOnAny) {
138 this.failOnAny = failOnAny;
139 }
140
141 /**
142 * Add a nested task to execute in parallel.
143 * @param nestedTask Nested task to be executed in parallel
144 */
145 public void addTask(Task nestedTask) {
146 nestedTasks.addElement(nestedTask);
147 }
148
149 /**
150 * Dynamically generates the number of threads to execute based on the
151 * number of available processors (via
152 * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
153 * 1.4 VM, and it will overwrite the value set in threadCount.
154 * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
155 * <code>threadCount</code>.; optional
156 * @param numThreadsPerProcessor Number of threads to create per available
157 * processor.
158 *
159 */
160 public void setThreadsPerProcessor(int numThreadsPerProcessor) {
161 this.numThreadsPerProcessor = numThreadsPerProcessor;
162 }
163
164 /**
165 * Statically determine the maximum number of tasks to execute
166 * simultaneously. If there are less tasks than threads then all will be
167 * executed at once, if there are more then only <code>threadCount</code>
168 * tasks will be executed at one time. If <code>threadsPerProcessor</code>
169 * is set and the JVM is at least a 1.4 VM then this value is
170 * ignored.; optional
171 *
172 * @param numThreads total number of threads.
173 *
174 */
175 public void setThreadCount(int numThreads) {
176 this.numThreads = numThreads;
177 }
178
179 /**
180 * Sets the timeout on this set of tasks. If the timeout is reached
181 * before the other threads complete, the execution of this
182 * task completes with an exception.
183 *
184 * Note that existing threads continue to run.
185 *
186 * @param timeout timeout in milliseconds.
187 */
188 public void setTimeout(long timeout) {
189 this.timeout = timeout;
190 }
191
192
193
194 /**
195 * Execute the parallel tasks
196 *
197 * @exception BuildException if any of the threads failed.
198 */
199 public void execute() throws BuildException {
200 updateThreadCounts();
201 if (numThreads == 0) {
202 numThreads = nestedTasks.size();
203 }
204 spinThreads();
205 }
206
207 /**
208 * Determine the number of threads based on the number of processors
209 */
210 private void updateThreadCounts() {
211 if (numThreadsPerProcessor != 0) {
212 int numProcessors = getNumProcessors();
213 if (numProcessors != 0) {
214 numThreads = numProcessors * numThreadsPerProcessor;
215 }
216 }
217 }
218
219 private void processExceptions(TaskRunnable[] runnables) {
220 if (runnables == null) {
221 return;
222 }
223 for (int i = 0; i < runnables.length; ++i) {
224 Throwable t = runnables[i].getException();
225 if (t != null) {
226 numExceptions++;
227 if (firstException == null) {
228 firstException = t;
229 }
230 if (t instanceof BuildException
231 && firstLocation == Location.UNKNOWN_LOCATION) {
232 firstLocation = ((BuildException) t).getLocation();
233 }
234 exceptionMessage.append(StringUtils.LINE_SEP);
235 exceptionMessage.append(t.getMessage());
236 }
237 }
238 }
239
240 /**
241 * Spin up required threads with a maximum number active at any given time.
242 *
243 * @exception BuildException if any of the threads failed.
244 */
245 private void spinThreads() throws BuildException {
246 final int numTasks = nestedTasks.size();
247 TaskRunnable[] runnables = new TaskRunnable[numTasks];
248 stillRunning = true;
249 timedOut = false;
250
251 int threadNumber = 0;
252 for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
253 threadNumber++) {
254 Task nestedTask = (Task) e.nextElement();
255 runnables[threadNumber]
256 = new TaskRunnable(nestedTask);
257 }
258
259 final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
260 TaskRunnable[] running = new TaskRunnable[maxRunning];
261
262 threadNumber = 0;
263 ThreadGroup group = new ThreadGroup("parallel");
264
265 TaskRunnable[] daemons = null;
266 if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
267 daemons = new TaskRunnable[daemonTasks.tasks.size()];
268 }
269
270 synchronized (semaphore) {
271 // When we leave this block we can be sure all data is really
272 // stored in main memory before the new threads start, the new
273 // threads will for sure load the data from main memory.
274 //
275 // This probably is slightly paranoid.
276 }
277
278 synchronized (semaphore) {
279 // start any daemon threads
280 if (daemons != null) {
281 for (int i = 0; i < daemons.length; ++i) {
282 daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
283 Thread daemonThread = new Thread(group, daemons[i]);
284 daemonThread.setDaemon(true);
285 daemonThread.start();
286 }
287 }
288
289 // now run main threads in limited numbers...
290 // start initial batch of threads
291 for (int i = 0; i < maxRunning; ++i) {
292 running[i] = runnables[threadNumber++];
293 Thread thread = new Thread(group, running[i]);
294 thread.start();
295 }
296
297 if (timeout != 0) {
298 // start the timeout thread
299 Thread timeoutThread = new Thread() {
300 public synchronized void run() {
301 try {
302 wait(timeout);
303 synchronized (semaphore) {
304 stillRunning = false;
305 timedOut = true;
306 semaphore.notifyAll();
307 }
308 } catch (InterruptedException e) {
309 // ignore
310 }
311 }
312 };
313 timeoutThread.start();
314 }
315
316 // now find available running slots for the remaining threads
317 outer:
318 while (threadNumber < numTasks && stillRunning) {
319 for (int i = 0; i < maxRunning; i++) {
320 if (running[i] == null || running[i].isFinished()) {
321 running[i] = runnables[threadNumber++];
322 Thread thread = new Thread(group, running[i]);
323 thread.start();
324 // continue on outer while loop to get another
325 // available slot
326 continue outer;
327 }
328 }
329
330 // if we got here all slots in use, so sleep until
331 // something happens
332 try {
333 semaphore.wait();
334 } catch (InterruptedException ie) {
335 // doesn't java know interruptions are rude?
336 // just pretend it didn't happen and go about out business.
337 // sheesh!
338 }
339 }
340
341 // are all threads finished
342 outer2:
343 while (stillRunning) {
344 for (int i = 0; i < maxRunning; ++i) {
345 if (running[i] != null && !running[i].isFinished()) {
346 //System.out.println("Thread " + i + " is still alive ");
347 // still running - wait for it
348 try {
349 semaphore.wait();
350 } catch (InterruptedException ie) {
351 // who would interrupt me at a time like this?
352 }
353 continue outer2;
354 }
355 }
356 stillRunning = false;
357 }
358 }
359
360 if (timedOut) {
361 throw new BuildException("Parallel execution timed out");
362 }
363
364 // now did any of the threads throw an exception
365 exceptionMessage = new StringBuffer();
366 numExceptions = 0;
367 firstException = null;
368 firstLocation = Location.UNKNOWN_LOCATION;
369 processExceptions(daemons);
370 processExceptions(runnables);
371
372 if (numExceptions == 1) {
373 if (firstException instanceof BuildException) {
374 throw (BuildException) firstException;
375 } else {
376 throw new BuildException(firstException);
377 }
378 } else if (numExceptions > 1) {
379 throw new BuildException(exceptionMessage.toString(),
380 firstLocation);
381 }
382 }
383
384 /**
385 * Determine the number of processors. Only effective on later VMs
386 *
387 * @return the number of processors available or 0 if not determinable.
388 */
389 private int getNumProcessors() {
390 try {
391 Class[] paramTypes = {};
392 Method availableProcessors =
393 Runtime.class.getMethod("availableProcessors", paramTypes);
394
395 Object[] args = {};
396 Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
397 return ret.intValue();
398 } catch (Exception e) {
399 // return a bogus number
400 return 0;
401 }
402 }
403
404 /**
405 * thread that execs a task
406 */
407 private class TaskRunnable implements Runnable {
408 private Throwable exception;
409 private Task task;
410 private boolean finished;
411
412 /**
413 * Construct a new TaskRunnable.<p>
414 *
415 * @param task the Task to be executed in a separate thread
416 */
417 TaskRunnable(Task task) {
418 this.task = task;
419 }
420
421 /**
422 * Executes the task within a thread and takes care about
423 * Exceptions raised within the task.
424 */
425 public void run() {
426 try {
427 task.perform();
428 } catch (Throwable t) {
429 exception = t;
430 if (failOnAny) {
431 stillRunning = false;
432 }
433 } finally {
434 synchronized (semaphore) {
435 finished = true;
436 semaphore.notifyAll();
437 }
438 }
439 }
440
441 /**
442 * get any exception that got thrown during execution;
443 * @return an exception or null for no exception/not yet finished
444 */
445 public Throwable getException() {
446 return exception;
447 }
448
449 /**
450 * Provides the indicator that the task has been finished.
451 * @return Returns true when the task is finished.
452 */
453 boolean isFinished() {
454 return finished;
455 }
456 }
457
458}
Note: See TracBrowser for help on using the repository browser.