001 /*
002 *
003 * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
004 *
005 */
006 package demo.sharedqueue;
007
008 import java.util.Collections;
009 import java.util.LinkedList;
010 import java.util.List;
011
012 import com.tcclient.cluster.DsoNode;
013
014 class Worker implements Runnable {
015
016 private final static int HEALTH_ALIVE = 0;
017 private final static int HEALTH_DYING = 1;
018 private final static int HEALTH_DEAD = 2;
019 private final static int MAX_LOAD = 10;
020
021 private final String name;
022 private final int port;
023 private final Queue queue;
024 private final List<Job> jobs;
025 private final DsoNode node;
026
027 private int health = HEALTH_ALIVE;
028
029 public Worker(final Queue queue, final int port, final DsoNode node) {
030 this.name = Queue.getHostName();
031 this.port = port;
032 this.queue = queue;
033 this.node = node;
034 jobs = Collections.synchronizedList(new LinkedList<Job>());
035 }
036
037 public final DsoNode getNode() {
038 return node;
039 }
040
041 public final String getName() {
042 return "node: " + node + " (" + name + ":" + port + ")";
043 }
044
045 public final String toXml() {
046 synchronized (jobs) {
047 String data = "<worker><name>" + getName() + "</name><jobs>";
048 for (Job job : jobs) {
049 data += job.toXml();
050 }
051 data += "</jobs></worker>";
052 return data;
053 }
054 }
055
056 /**
057 * Attempt to mark the Worker as dead (if it's already dying); Note that we
058 * synchronize this method since it's mutating a shared object (this class)
059 *
060 * @return True if the Worker is dead.
061 */
062 public final synchronized boolean expire() {
063 if (HEALTH_DYING == health) {
064 // a dying Worker wont die until it has
065 // consumed all of it's jobs
066 if (jobs.size() > 0) {
067 queue.addJob(jobs.remove(0));
068 } else {
069 setHealth(HEALTH_DEAD);
070 }
071 }
072 return (HEALTH_DEAD == health);
073 }
074
075 /**
076 * Set the state of the Worker's health; Note that we synchronize this
077 * method since it's mutating a shared object (this class)
078 *
079 * @param health
080 */
081 private final synchronized void setHealth(final int health) {
082 this.health = health;
083 }
084
085 /**
086 * Set the state of the Worker's health to dying; Note that we synchronize
087 * this method since it's mutating a shared object (this class)
088 *
089 * @param health
090 */
091 public final synchronized void markForExpiration() {
092 setHealth(HEALTH_DYING);
093 }
094
095 public final void run() {
096 while (HEALTH_DEAD != health) {
097 if ((HEALTH_ALIVE == health) && (jobs.size() < MAX_LOAD)) {
098 final Job job = queue.getJob();
099
100 try {
101 Thread.sleep(500);
102 } catch (InterruptedException ie) {
103 System.err.println(ie.getMessage());
104 }
105
106 synchronized (jobs) {
107 jobs.add(job);
108 }
109
110 Thread processor = new Thread(new Runnable() {
111 public void run() {
112 job.run(Worker.this);
113 synchronized (jobs) {
114 jobs.remove(job);
115 }
116 queue.log(job);
117 }
118 });
119 processor.start();
120 }
121 }
122 }
123 }
|