1 | /*
|
---|
2 | * Copyright 2001-2004 The Apache Software Foundation
|
---|
3 | *
|
---|
4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
---|
5 | * you may not use this file except in compliance with the License.
|
---|
6 | * You may obtain a copy of the License at
|
---|
7 | *
|
---|
8 | * http://www.apache.org/licenses/LICENSE-2.0
|
---|
9 | *
|
---|
10 | * Unless required by applicable law or agreed to in writing, software
|
---|
11 | * distributed under the License is distributed on an "AS IS" BASIS,
|
---|
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
---|
13 | * See the License for the specific language governing permissions and
|
---|
14 | * limitations under the License.
|
---|
15 | *
|
---|
16 | */
|
---|
17 | package org.apache.tools.ant.taskdefs;
|
---|
18 |
|
---|
19 | import java.lang.reflect.Method;
|
---|
20 | import java.util.Enumeration;
|
---|
21 | import java.util.Vector;
|
---|
22 | import java.util.List;
|
---|
23 | import java.util.ArrayList;
|
---|
24 | import org.apache.tools.ant.BuildException;
|
---|
25 | import org.apache.tools.ant.Location;
|
---|
26 | import org.apache.tools.ant.Task;
|
---|
27 | import org.apache.tools.ant.TaskContainer;
|
---|
28 | import org.apache.tools.ant.util.StringUtils;
|
---|
29 |
|
---|
30 | /**
|
---|
31 | * Executes the contained tasks in separate threads, continuing
|
---|
32 | * once all are completed.
|
---|
33 | * <p>
|
---|
34 | * New behavior allows for the ant script to specify a maximum number of
|
---|
35 | * threads that will be executed in parallel. One should be very careful about
|
---|
36 | * using the <code>waitFor</code> task when specifying <code>threadCount</code>
|
---|
37 | * as it can cause deadlocks if the number of threads is too small or if one of
|
---|
38 | * the nested tasks fails to execute completely. The task selection algorithm
|
---|
39 | * will insure that the tasks listed before a task have started before that
|
---|
40 | * task is started, but it will not insure a successful completion of those
|
---|
41 | * tasks or that those tasks will finish first (i.e. it's a classic race
|
---|
42 | * condition).
|
---|
43 | * </p>
|
---|
44 | * @since Ant 1.4
|
---|
45 | *
|
---|
46 | * @ant.task category="control"
|
---|
47 | */
|
---|
48 | public class Parallel extends Task
|
---|
49 | implements TaskContainer {
|
---|
50 |
|
---|
51 | /** Class which holds a list of tasks to execute */
|
---|
52 | public static class TaskList implements TaskContainer {
|
---|
53 | /** Collection holding the nested tasks */
|
---|
54 | private List tasks = new ArrayList();
|
---|
55 |
|
---|
56 | /**
|
---|
57 | * Add a nested task to execute parallel (asynchron).
|
---|
58 | * <p>
|
---|
59 | * @param nestedTask Nested task to be executed in parallel.
|
---|
60 | * must not be null.
|
---|
61 | */
|
---|
62 | public void addTask(Task nestedTask) {
|
---|
63 | tasks.add(nestedTask);
|
---|
64 | }
|
---|
65 | }
|
---|
66 |
|
---|
67 | /** Collection holding the nested tasks */
|
---|
68 | private Vector nestedTasks = new Vector();
|
---|
69 |
|
---|
70 | /** Semaphore to notify of completed threads */
|
---|
71 | private final Object semaphore = new Object();
|
---|
72 |
|
---|
73 | /** Total number of threads to run */
|
---|
74 | private int numThreads = 0;
|
---|
75 |
|
---|
76 | /** Total number of threads per processor to run. */
|
---|
77 | private int numThreadsPerProcessor = 0;
|
---|
78 |
|
---|
79 | /** The timeout period in milliseconds */
|
---|
80 | private long timeout;
|
---|
81 |
|
---|
82 | /** Indicates threads are still running and new threads can be issued */
|
---|
83 | private volatile boolean stillRunning;
|
---|
84 |
|
---|
85 | /** Indicates that the execution timedout */
|
---|
86 | private boolean timedOut;
|
---|
87 |
|
---|
88 | /**
|
---|
89 | * Indicates whether failure of any of the nested tasks should end
|
---|
90 | * execution
|
---|
91 | */
|
---|
92 | private boolean failOnAny;
|
---|
93 |
|
---|
94 | /** The dameon task list if any */
|
---|
95 | private TaskList daemonTasks;
|
---|
96 |
|
---|
97 | /** Accumulation of exceptions messages from all nested tasks */
|
---|
98 | private StringBuffer exceptionMessage;
|
---|
99 |
|
---|
100 | /** Number of exceptions from nested tasks */
|
---|
101 | private int numExceptions = 0;
|
---|
102 |
|
---|
103 | /** The first exception encountered */
|
---|
104 | private Throwable firstException;
|
---|
105 |
|
---|
106 | /** The location of the first exception */
|
---|
107 | private Location firstLocation;
|
---|
108 |
|
---|
109 | /**
|
---|
110 | * Add a group of daemon threads
|
---|
111 | * @param daemonTasks The tasks to be executed as daemon.
|
---|
112 | */
|
---|
113 | public void addDaemons(TaskList daemonTasks) {
|
---|
114 | if (this.daemonTasks != null) {
|
---|
115 | throw new BuildException("Only one daemon group is supported");
|
---|
116 | }
|
---|
117 | this.daemonTasks = daemonTasks;
|
---|
118 | }
|
---|
119 |
|
---|
120 | /**
|
---|
121 | * Interval to poll for completed threads when threadCount or
|
---|
122 | * threadsPerProcessor is specified. Integer in milliseconds.; optional
|
---|
123 | *
|
---|
124 | * @param pollInterval New value of property pollInterval.
|
---|
125 | */
|
---|
126 | public void setPollInterval(int pollInterval) {
|
---|
127 | }
|
---|
128 |
|
---|
129 | /**
|
---|
130 | * Control whether a failure in a nested task halts execution. Note that
|
---|
131 | * the task will complete but existing threads will continue to run - they
|
---|
132 | * are not stopped
|
---|
133 | *
|
---|
134 | * @param failOnAny if true any nested task failure causes parallel to
|
---|
135 | * complete.
|
---|
136 | */
|
---|
137 | public void setFailOnAny(boolean failOnAny) {
|
---|
138 | this.failOnAny = failOnAny;
|
---|
139 | }
|
---|
140 |
|
---|
141 | /**
|
---|
142 | * Add a nested task to execute in parallel.
|
---|
143 | * @param nestedTask Nested task to be executed in parallel
|
---|
144 | */
|
---|
145 | public void addTask(Task nestedTask) {
|
---|
146 | nestedTasks.addElement(nestedTask);
|
---|
147 | }
|
---|
148 |
|
---|
149 | /**
|
---|
150 | * Dynamically generates the number of threads to execute based on the
|
---|
151 | * number of available processors (via
|
---|
152 | * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
|
---|
153 | * 1.4 VM, and it will overwrite the value set in threadCount.
|
---|
154 | * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
|
---|
155 | * <code>threadCount</code>.; optional
|
---|
156 | * @param numThreadsPerProcessor Number of threads to create per available
|
---|
157 | * processor.
|
---|
158 | *
|
---|
159 | */
|
---|
160 | public void setThreadsPerProcessor(int numThreadsPerProcessor) {
|
---|
161 | this.numThreadsPerProcessor = numThreadsPerProcessor;
|
---|
162 | }
|
---|
163 |
|
---|
164 | /**
|
---|
165 | * Statically determine the maximum number of tasks to execute
|
---|
166 | * simultaneously. If there are less tasks than threads then all will be
|
---|
167 | * executed at once, if there are more then only <code>threadCount</code>
|
---|
168 | * tasks will be executed at one time. If <code>threadsPerProcessor</code>
|
---|
169 | * is set and the JVM is at least a 1.4 VM then this value is
|
---|
170 | * ignored.; optional
|
---|
171 | *
|
---|
172 | * @param numThreads total number of threads.
|
---|
173 | *
|
---|
174 | */
|
---|
175 | public void setThreadCount(int numThreads) {
|
---|
176 | this.numThreads = numThreads;
|
---|
177 | }
|
---|
178 |
|
---|
179 | /**
|
---|
180 | * Sets the timeout on this set of tasks. If the timeout is reached
|
---|
181 | * before the other threads complete, the execution of this
|
---|
182 | * task completes with an exception.
|
---|
183 | *
|
---|
184 | * Note that existing threads continue to run.
|
---|
185 | *
|
---|
186 | * @param timeout timeout in milliseconds.
|
---|
187 | */
|
---|
188 | public void setTimeout(long timeout) {
|
---|
189 | this.timeout = timeout;
|
---|
190 | }
|
---|
191 |
|
---|
192 |
|
---|
193 |
|
---|
194 | /**
|
---|
195 | * Execute the parallel tasks
|
---|
196 | *
|
---|
197 | * @exception BuildException if any of the threads failed.
|
---|
198 | */
|
---|
199 | public void execute() throws BuildException {
|
---|
200 | updateThreadCounts();
|
---|
201 | if (numThreads == 0) {
|
---|
202 | numThreads = nestedTasks.size();
|
---|
203 | }
|
---|
204 | spinThreads();
|
---|
205 | }
|
---|
206 |
|
---|
207 | /**
|
---|
208 | * Determine the number of threads based on the number of processors
|
---|
209 | */
|
---|
210 | private void updateThreadCounts() {
|
---|
211 | if (numThreadsPerProcessor != 0) {
|
---|
212 | int numProcessors = getNumProcessors();
|
---|
213 | if (numProcessors != 0) {
|
---|
214 | numThreads = numProcessors * numThreadsPerProcessor;
|
---|
215 | }
|
---|
216 | }
|
---|
217 | }
|
---|
218 |
|
---|
219 | private void processExceptions(TaskRunnable[] runnables) {
|
---|
220 | if (runnables == null) {
|
---|
221 | return;
|
---|
222 | }
|
---|
223 | for (int i = 0; i < runnables.length; ++i) {
|
---|
224 | Throwable t = runnables[i].getException();
|
---|
225 | if (t != null) {
|
---|
226 | numExceptions++;
|
---|
227 | if (firstException == null) {
|
---|
228 | firstException = t;
|
---|
229 | }
|
---|
230 | if (t instanceof BuildException
|
---|
231 | && firstLocation == Location.UNKNOWN_LOCATION) {
|
---|
232 | firstLocation = ((BuildException) t).getLocation();
|
---|
233 | }
|
---|
234 | exceptionMessage.append(StringUtils.LINE_SEP);
|
---|
235 | exceptionMessage.append(t.getMessage());
|
---|
236 | }
|
---|
237 | }
|
---|
238 | }
|
---|
239 |
|
---|
240 | /**
|
---|
241 | * Spin up required threads with a maximum number active at any given time.
|
---|
242 | *
|
---|
243 | * @exception BuildException if any of the threads failed.
|
---|
244 | */
|
---|
245 | private void spinThreads() throws BuildException {
|
---|
246 | final int numTasks = nestedTasks.size();
|
---|
247 | TaskRunnable[] runnables = new TaskRunnable[numTasks];
|
---|
248 | stillRunning = true;
|
---|
249 | timedOut = false;
|
---|
250 |
|
---|
251 | int threadNumber = 0;
|
---|
252 | for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
|
---|
253 | threadNumber++) {
|
---|
254 | Task nestedTask = (Task) e.nextElement();
|
---|
255 | runnables[threadNumber]
|
---|
256 | = new TaskRunnable(nestedTask);
|
---|
257 | }
|
---|
258 |
|
---|
259 | final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
|
---|
260 | TaskRunnable[] running = new TaskRunnable[maxRunning];
|
---|
261 |
|
---|
262 | threadNumber = 0;
|
---|
263 | ThreadGroup group = new ThreadGroup("parallel");
|
---|
264 |
|
---|
265 | TaskRunnable[] daemons = null;
|
---|
266 | if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
|
---|
267 | daemons = new TaskRunnable[daemonTasks.tasks.size()];
|
---|
268 | }
|
---|
269 |
|
---|
270 | synchronized (semaphore) {
|
---|
271 | // When we leave this block we can be sure all data is really
|
---|
272 | // stored in main memory before the new threads start, the new
|
---|
273 | // threads will for sure load the data from main memory.
|
---|
274 | //
|
---|
275 | // This probably is slightly paranoid.
|
---|
276 | }
|
---|
277 |
|
---|
278 | synchronized (semaphore) {
|
---|
279 | // start any daemon threads
|
---|
280 | if (daemons != null) {
|
---|
281 | for (int i = 0; i < daemons.length; ++i) {
|
---|
282 | daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
|
---|
283 | Thread daemonThread = new Thread(group, daemons[i]);
|
---|
284 | daemonThread.setDaemon(true);
|
---|
285 | daemonThread.start();
|
---|
286 | }
|
---|
287 | }
|
---|
288 |
|
---|
289 | // now run main threads in limited numbers...
|
---|
290 | // start initial batch of threads
|
---|
291 | for (int i = 0; i < maxRunning; ++i) {
|
---|
292 | running[i] = runnables[threadNumber++];
|
---|
293 | Thread thread = new Thread(group, running[i]);
|
---|
294 | thread.start();
|
---|
295 | }
|
---|
296 |
|
---|
297 | if (timeout != 0) {
|
---|
298 | // start the timeout thread
|
---|
299 | Thread timeoutThread = new Thread() {
|
---|
300 | public synchronized void run() {
|
---|
301 | try {
|
---|
302 | wait(timeout);
|
---|
303 | synchronized (semaphore) {
|
---|
304 | stillRunning = false;
|
---|
305 | timedOut = true;
|
---|
306 | semaphore.notifyAll();
|
---|
307 | }
|
---|
308 | } catch (InterruptedException e) {
|
---|
309 | // ignore
|
---|
310 | }
|
---|
311 | }
|
---|
312 | };
|
---|
313 | timeoutThread.start();
|
---|
314 | }
|
---|
315 |
|
---|
316 | // now find available running slots for the remaining threads
|
---|
317 | outer:
|
---|
318 | while (threadNumber < numTasks && stillRunning) {
|
---|
319 | for (int i = 0; i < maxRunning; i++) {
|
---|
320 | if (running[i] == null || running[i].isFinished()) {
|
---|
321 | running[i] = runnables[threadNumber++];
|
---|
322 | Thread thread = new Thread(group, running[i]);
|
---|
323 | thread.start();
|
---|
324 | // continue on outer while loop to get another
|
---|
325 | // available slot
|
---|
326 | continue outer;
|
---|
327 | }
|
---|
328 | }
|
---|
329 |
|
---|
330 | // if we got here all slots in use, so sleep until
|
---|
331 | // something happens
|
---|
332 | try {
|
---|
333 | semaphore.wait();
|
---|
334 | } catch (InterruptedException ie) {
|
---|
335 | // doesn't java know interruptions are rude?
|
---|
336 | // just pretend it didn't happen and go about out business.
|
---|
337 | // sheesh!
|
---|
338 | }
|
---|
339 | }
|
---|
340 |
|
---|
341 | // are all threads finished
|
---|
342 | outer2:
|
---|
343 | while (stillRunning) {
|
---|
344 | for (int i = 0; i < maxRunning; ++i) {
|
---|
345 | if (running[i] != null && !running[i].isFinished()) {
|
---|
346 | //System.out.println("Thread " + i + " is still alive ");
|
---|
347 | // still running - wait for it
|
---|
348 | try {
|
---|
349 | semaphore.wait();
|
---|
350 | } catch (InterruptedException ie) {
|
---|
351 | // who would interrupt me at a time like this?
|
---|
352 | }
|
---|
353 | continue outer2;
|
---|
354 | }
|
---|
355 | }
|
---|
356 | stillRunning = false;
|
---|
357 | }
|
---|
358 | }
|
---|
359 |
|
---|
360 | if (timedOut) {
|
---|
361 | throw new BuildException("Parallel execution timed out");
|
---|
362 | }
|
---|
363 |
|
---|
364 | // now did any of the threads throw an exception
|
---|
365 | exceptionMessage = new StringBuffer();
|
---|
366 | numExceptions = 0;
|
---|
367 | firstException = null;
|
---|
368 | firstLocation = Location.UNKNOWN_LOCATION;
|
---|
369 | processExceptions(daemons);
|
---|
370 | processExceptions(runnables);
|
---|
371 |
|
---|
372 | if (numExceptions == 1) {
|
---|
373 | if (firstException instanceof BuildException) {
|
---|
374 | throw (BuildException) firstException;
|
---|
375 | } else {
|
---|
376 | throw new BuildException(firstException);
|
---|
377 | }
|
---|
378 | } else if (numExceptions > 1) {
|
---|
379 | throw new BuildException(exceptionMessage.toString(),
|
---|
380 | firstLocation);
|
---|
381 | }
|
---|
382 | }
|
---|
383 |
|
---|
384 | /**
|
---|
385 | * Determine the number of processors. Only effective on later VMs
|
---|
386 | *
|
---|
387 | * @return the number of processors available or 0 if not determinable.
|
---|
388 | */
|
---|
389 | private int getNumProcessors() {
|
---|
390 | try {
|
---|
391 | Class[] paramTypes = {};
|
---|
392 | Method availableProcessors =
|
---|
393 | Runtime.class.getMethod("availableProcessors", paramTypes);
|
---|
394 |
|
---|
395 | Object[] args = {};
|
---|
396 | Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
|
---|
397 | return ret.intValue();
|
---|
398 | } catch (Exception e) {
|
---|
399 | // return a bogus number
|
---|
400 | return 0;
|
---|
401 | }
|
---|
402 | }
|
---|
403 |
|
---|
404 | /**
|
---|
405 | * thread that execs a task
|
---|
406 | */
|
---|
407 | private class TaskRunnable implements Runnable {
|
---|
408 | private Throwable exception;
|
---|
409 | private Task task;
|
---|
410 | private boolean finished;
|
---|
411 |
|
---|
412 | /**
|
---|
413 | * Construct a new TaskRunnable.<p>
|
---|
414 | *
|
---|
415 | * @param task the Task to be executed in a separate thread
|
---|
416 | */
|
---|
417 | TaskRunnable(Task task) {
|
---|
418 | this.task = task;
|
---|
419 | }
|
---|
420 |
|
---|
421 | /**
|
---|
422 | * Executes the task within a thread and takes care about
|
---|
423 | * Exceptions raised within the task.
|
---|
424 | */
|
---|
425 | public void run() {
|
---|
426 | try {
|
---|
427 | task.perform();
|
---|
428 | } catch (Throwable t) {
|
---|
429 | exception = t;
|
---|
430 | if (failOnAny) {
|
---|
431 | stillRunning = false;
|
---|
432 | }
|
---|
433 | } finally {
|
---|
434 | synchronized (semaphore) {
|
---|
435 | finished = true;
|
---|
436 | semaphore.notifyAll();
|
---|
437 | }
|
---|
438 | }
|
---|
439 | }
|
---|
440 |
|
---|
441 | /**
|
---|
442 | * get any exception that got thrown during execution;
|
---|
443 | * @return an exception or null for no exception/not yet finished
|
---|
444 | */
|
---|
445 | public Throwable getException() {
|
---|
446 | return exception;
|
---|
447 | }
|
---|
448 |
|
---|
449 | /**
|
---|
450 | * Provides the indicator that the task has been finished.
|
---|
451 | * @return Returns true when the task is finished.
|
---|
452 | */
|
---|
453 | boolean isFinished() {
|
---|
454 | return finished;
|
---|
455 | }
|
---|
456 | }
|
---|
457 |
|
---|
458 | }
|
---|