001 package org.openstreetmap.gui.jmapviewer;
002
003 //License: GPL. Copyright 2008 by Jan Peter Stotz
004
005 import java.util.concurrent.BlockingDeque;
006 import java.util.concurrent.LinkedBlockingDeque;
007 import java.util.concurrent.TimeUnit;
008
009 import org.openstreetmap.gui.jmapviewer.interfaces.TileJob;
010
011 /**
012 * A generic class that processes a list of {@link Runnable} one-by-one using
013 * one or more {@link Thread}-instances. The number of instances varies between
014 * 1 and {@link #WORKER_THREAD_MAX_COUNT} (default: 8). If an instance is idle
015 * more than {@link #WORKER_THREAD_TIMEOUT} seconds (default: 30), the instance
016 * ends itself.
017 *
018 * @author Jan Peter Stotz
019 */
020 public class JobDispatcher {
021
022 private static final JobDispatcher instance = new JobDispatcher();
023
024 /**
025 * @return the singelton instance of the {@link JobDispatcher}
026 */
027 public static JobDispatcher getInstance() {
028 return instance;
029 }
030
031 private JobDispatcher() {
032 addWorkerThread().firstThread = true;
033 }
034
035 protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<TileJob>();
036
037 public static int WORKER_THREAD_MAX_COUNT = 8;
038
039 /**
040 * Specifies the time span in seconds that a worker thread waits for new
041 * jobs to perform. If the time span has elapsed the worker thread
042 * terminates itself. Only the first worker thread works differently, it
043 * ignores the timeout and will never terminate itself.
044 */
045 public static int WORKER_THREAD_TIMEOUT = 30;
046
047 /**
048 * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code>
049 */
050 protected boolean modeLIFO = false;
051
052 /**
053 * Total number of worker threads currently idle or active
054 */
055 protected int workerThreadCount = 0;
056
057 /**
058 * Number of worker threads currently idle
059 */
060 protected int workerThreadIdleCount = 0;
061
062 /**
063 * Just an id for identifying an worker thread instance
064 */
065 protected int workerThreadId = 0;
066
067 /**
068 * Removes all jobs from the queue that are currently not being processed.
069 */
070 public void cancelOutstandingJobs() {
071 jobQueue.clear();
072 }
073
074 /**
075 * Function to set the maximum number of workers for tile loading.
076 */
077 static public void setMaxWorkers(int workers) {
078 WORKER_THREAD_MAX_COUNT = workers;
079 }
080
081 /**
082 * Function to set the LIFO/FIFO mode for tile loading job.
083 *
084 * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode
085 */
086 public void setLIFO(boolean lifo) {
087 modeLIFO = lifo;
088 }
089
090 /**
091 * Adds a job to the queue.
092 * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile
093 * prevents skipping).
094 *
095 * @param job the the job to be added
096 */
097 public void addJob(TileJob job) {
098 try {
099 if(job.getTile() != null) {
100 for(TileJob oldJob : jobQueue) {
101 if(oldJob.getTile() == job.getTile()) {
102 return;
103 }
104 }
105 }
106 jobQueue.put(job);
107 if (workerThreadIdleCount == 0 && workerThreadCount < WORKER_THREAD_MAX_COUNT)
108 addWorkerThread();
109 } catch (InterruptedException e) {
110 }
111 }
112
113 protected JobThread addWorkerThread() {
114 JobThread jobThread = new JobThread(++workerThreadId);
115 synchronized (this) {
116 workerThreadCount++;
117 }
118 jobThread.start();
119 return jobThread;
120 }
121
122 public class JobThread extends Thread {
123
124 Runnable job;
125 boolean firstThread = false;
126
127 public JobThread(int threadId) {
128 super("OSMJobThread " + threadId);
129 setDaemon(true);
130 job = null;
131 }
132
133 @Override
134 public void run() {
135 executeJobs();
136 synchronized (instance) {
137 workerThreadCount--;
138 }
139 }
140
141 protected void executeJobs() {
142 while (!isInterrupted()) {
143 try {
144 synchronized (instance) {
145 workerThreadIdleCount++;
146 }
147 if(modeLIFO) {
148 if (firstThread)
149 job = jobQueue.takeLast();
150 else
151 job = jobQueue.pollLast(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS);
152 } else {
153 if (firstThread)
154 job = jobQueue.take();
155 else
156 job = jobQueue.poll(WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS);
157 }
158 } catch (InterruptedException e1) {
159 return;
160 } finally {
161 synchronized (instance) {
162 workerThreadIdleCount--;
163 }
164 }
165 if (job == null)
166 return;
167 try {
168 job.run();
169 job = null;
170 } catch (Exception e) {
171 e.printStackTrace();
172 }
173 }
174 }
175 }
176
177 }