001package org.openstreetmap.gui.jmapviewer;
002
003//License: GPL. Copyright 2008 by Jan Peter Stotz
004
005import java.util.concurrent.BlockingDeque;
006import java.util.concurrent.LinkedBlockingDeque;
007import java.util.concurrent.TimeUnit;
008
009import org.openstreetmap.gui.jmapviewer.interfaces.TileJob;
010
011/**
012 * A generic class that processes a list of {@link Runnable} one-by-one using
013 * one or more {@link Thread}-instances. The number of instances varies between
014 * 1 and {@link #workerThreadMaxCount} (default: 8). If an instance is idle
015 * more than {@link #workerThreadTimeout} seconds (default: 30), the instance
016 * ends itself.
017 *
018 * @author Jan Peter Stotz
019 */
020public class JobDispatcher {
021
022    private static final JobDispatcher instance = new JobDispatcher();
023
024    /**
025     * @return the singelton instance of the {@link JobDispatcher}
026     */
027    public static JobDispatcher getInstance() {
028        return instance;
029    }
030
031    private JobDispatcher() {
032        addWorkerThread().firstThread = true;
033    }
034
035    protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<TileJob>();
036
037    protected static int workerThreadMaxCount = 8;
038
039    /**
040     * Specifies the time span in seconds that a worker thread waits for new
041     * jobs to perform. If the time span has elapsed the worker thread
042     * terminates itself. Only the first worker thread works differently, it
043     * ignores the timeout and will never terminate itself.
044     */
045    protected static int workerThreadTimeout = 30;
046
047    /**
048     * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code>
049     */
050    protected boolean modeLIFO = false;
051
052    /**
053     * Total number of worker threads currently idle or active
054     */
055    protected int workerThreadCount = 0;
056
057    /**
058     * Number of worker threads currently idle
059     */
060    protected int workerThreadIdleCount = 0;
061
062    /**
063     * Just an id for identifying an worker thread instance
064     */
065    protected int workerThreadId = 0;
066
067    /**
068     * Removes all jobs from the queue that are currently not being processed.
069     */
070    public void cancelOutstandingJobs() {
071        jobQueue.clear();
072    }
073
074    /**
075     * Function to set the maximum number of workers for tile loading.
076     */
077    static public void setMaxWorkers(int workers) {
078        workerThreadMaxCount = workers;
079    }
080
081    /**
082     * Function to set the LIFO/FIFO mode for tile loading job.
083     *
084     * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode
085     */
086    public void setLIFO(boolean lifo) {
087        modeLIFO = lifo;
088    }
089
090    /**
091     * Adds a job to the queue.
092     * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile
093     * prevents skipping).
094     *
095     * @param job the the job to be added
096     */
097    public void addJob(TileJob job) {
098        try {
099            if(job.getTile() != null) {
100                for(TileJob oldJob : jobQueue) {
101                    if(oldJob.getTile() == job.getTile()) {
102                        return;
103                    }
104                }
105            }
106            jobQueue.put(job);
107            if (workerThreadIdleCount == 0 && workerThreadCount < workerThreadMaxCount)
108                addWorkerThread();
109        } catch (InterruptedException e) {
110        }
111    }
112
113    protected JobThread addWorkerThread() {
114        JobThread jobThread = new JobThread(++workerThreadId);
115        synchronized (this) {
116            workerThreadCount++;
117        }
118        jobThread.start();
119        return jobThread;
120    }
121
122    public class JobThread extends Thread {
123
124        Runnable job;
125        boolean firstThread = false;
126
127        public JobThread(int threadId) {
128            super("OSMJobThread " + threadId);
129            setDaemon(true);
130            job = null;
131        }
132
133        @Override
134        public void run() {
135            executeJobs();
136            synchronized (instance) {
137                workerThreadCount--;
138            }
139        }
140
141        protected void executeJobs() {
142            while (!isInterrupted()) {
143                try {
144                    synchronized (instance) {
145                        workerThreadIdleCount++;
146                    }
147                    if(modeLIFO) {
148                        if (firstThread)
149                            job = jobQueue.takeLast();
150                        else
151                            job = jobQueue.pollLast(workerThreadTimeout, TimeUnit.SECONDS);
152                    } else {
153                        if (firstThread)
154                            job = jobQueue.take();
155                        else
156                            job = jobQueue.poll(workerThreadTimeout, TimeUnit.SECONDS);
157                    }
158                } catch (InterruptedException e1) {
159                    return;
160                } finally {
161                    synchronized (instance) {
162                        workerThreadIdleCount--;
163                    }
164                }
165                if (job == null)
166                    return;
167                try {
168                    job.run();
169                    job = null;
170                } catch (Exception e) {
171                    e.printStackTrace();
172                }
173            }
174        }
175    }
176
177}