Diswork-commits
Threads by month
- ----- 2026 -----
- June
- May
- April
- March
- February
- January
- ----- 2025 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
June 2010
- 2 participants
- 39 discussions
r82 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 17 Jun '10
by bleny@users.nuiton.org 17 Jun '10
17 Jun '10
Author: bleny
Date: 2010-06-17 15:33:48 +0200 (Thu, 17 Jun 2010)
New Revision: 82
Url: http://nuiton.org/repositories/revision/diswork/82
Log:
bugfix concurrence
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 09:00:26 UTC (rev 81)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 13:33:48 UTC (rev 82)
@@ -72,9 +72,9 @@
RUNNING_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_1_RUNNING);
RUNNING_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_2_RUNNING);
- FAILED_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1);
- FAILED_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2);
- FAILED_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3);
+ FAILED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.FAILED_1);
+ FAILED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_2);
+ FAILED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_3);
}
private static final Log log = LogFactory.getLog(WorkersManager.class);
@@ -89,23 +89,34 @@
protected class Worker extends Thread {
- public boolean shouldStop = false;
+ protected boolean shouldStop = false;
- public WorkersManager manager;
+ protected WorkersManager manager;
+ /**
+ * this method add a line to a job-specific log
+ * @param jobPath the path to the job concerned
+ * @param message the line to add the the log
+ * @throws IOException if an error occurred while writing the log
+ */
protected void log(String jobPath, String message) throws IOException {
String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH;
InputStream oldLogAsStream = fileSystem.read(logPath);
String oldLog = IOUtils.toString(oldLogAsStream);
-
String logEntry = message + "\n";
-
String newLog = oldLog + logEntry;
-
- fileSystem.delete(logPath);
fileSystem.write(logPath, IOUtils.toInputStream(newLog));
}
+ /**
+ * Download all the files needed for a job in a temp directory, run
+ * the job, wait for it to end, write all the results. Mark the job
+ * as running at the beginning and move it to DONE or FAILED at
+ * the end, depending of the results
+ * @param jobPath
+ * @return
+ * @throws IOException
+ */
protected boolean runJob(String jobPath) throws IOException {
log.info("running job at " + jobPath);
@@ -168,13 +179,13 @@
IOUtils.copy(source, new FileOutputStream(localCopy));
}
- // executing the job
+ // prepare and run it the job
String commandLine = jobDescription.getCommandLine();
- log.info("calling " + commandLine);
String[] commandLineElements = commandLine.split(" ");
ProcessBuilder builder = new ProcessBuilder(commandLineElements);
builder.directory(jobDir);
builder.redirectErrorStream(true);
+ log.info("calling " + commandLine);
Process job = builder.start();
// plugging a file on the standard input
@@ -185,6 +196,7 @@
IOUtils.copy(input, job.getOutputStream());
}
+ // run the process and wait for it to return
int exitValue = -1;
try {
exitValue = job.waitFor();
@@ -237,7 +249,7 @@
return success;
}
- public String getFistJobName(String path) throws IOException {
+ protected String getFistJobName(String path) throws IOException {
List<String> jobsNames = fileSystem.readDirectory(path);
if (jobsNames.size() == 0) {
return null;
@@ -250,71 +262,93 @@
protected void findAJobAndRunIt() {
// try to find a new job
try {
+
+ // Once a job is found, those two var will be set
String jobLinkDir = null;
String jobLinkName = null;
- String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
- DisworkDaemon.FAILED_1_RUNNING,
- DisworkDaemon.TODO_RUNNING
- };
-
- // FIXME 20100617 bleny stop loops when found
- for (String path : runningJobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) {
- Long linkAge = System.currentTimeMillis()
- - Long.parseLong(oldName);
- if (linkAge > MAX_JOB_RUNNING_TIME) {
- log.info("taking old job (age = " + linkAge + ")");
- jobLinkDir = path;
- jobLinkName = oldName;
+ // use a synchronized block because multiple workers
+ // may try to take a same job
+ synchronized (fileSystem) {
+
+ // fist, try to find a job declared has running since
+ // too long to re-run it
+ String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
+ DisworkDaemon.FAILED_1_RUNNING,
+ DisworkDaemon.TODO_RUNNING
+ };
+ // browsing all "running" dirs
+ for (String path : runningJobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) {
+ Long linkAge = System.currentTimeMillis()
+ - Long.parseLong(oldName);
+ // check is oldest job is too old and should be
+ // considered has to-be-rerun
+ if (linkAge > MAX_JOB_RUNNING_TIME) {
+ log.info("taking old job (age = " + linkAge + ")");
+ jobLinkDir = path;
+ jobLinkName = oldName;
+ // FIXME 20100617 bleny break s*cks
+ break;
+ }
}
}
- }
-
- if (jobLinkDir == null) {
- String[] jobsDirs = { DisworkDaemon.FAILED_2,
- DisworkDaemon.FAILED_1,
- DisworkDaemon.TODO
- };
- for (String path : jobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) { // take it
- jobLinkDir = path;
- jobLinkName = oldName;
+
+ // if no job was found, search now in not running jobs
+ if (jobLinkDir == null) {
+ String[] jobsDirs = { DisworkDaemon.FAILED_2,
+ DisworkDaemon.FAILED_1,
+ DisworkDaemon.TODO
+ };
+ for (String path : jobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) { // take it
+ jobLinkDir = path;
+ jobLinkName = oldName;
+ // FIXME 20100617 bleny break s*cks
+ break;
+ }
}
}
+
+ if (jobLinkDir != null) {
+ // move the link before running the job
+ String oldPath = jobLinkDir + "/" + jobLinkName;
+ log.info("job found at " + oldPath);
+
+ jobLinkDir = RUNNING_MOVE.get(jobLinkDir);
+ jobLinkName = DisworkDaemon.newJobLinkName();
+ String newPath = jobLinkDir + "/" + jobLinkName;
+
+ log.info("moving " + oldPath + " to " + newPath);
+ fileSystem.move(oldPath, newPath);
+
+ }
}
+ // now, if no job was found, do nothing
if (jobLinkDir == null) {
log.info("nothing to do");
Thread.sleep(JOB_WAIT);
+ // if a job was found, take it
} else {
- // move the link before running the job
- String oldPath = jobLinkDir + "/" + jobLinkName;
- log.info("job found at " + oldPath);
+
+ String jobPath = jobLinkDir + "/" + jobLinkName;
- String newPath = RUNNING_MOVE.get(jobLinkDir) + "/" +
- DisworkDaemon.newJobLinkName();
+ boolean jobSuccess = runJob(jobPath);
- log.info("moving " + oldPath + " to " + newPath);
- fileSystem.move(oldPath, newPath);
-
- // run the job
- boolean jobSuccess = runJob(newPath);
-
// move the link after the job
- oldPath = newPath;
String newDir = null;
if (jobSuccess) {
newDir = DisworkDaemon.DONE;
} else {
newDir = FAILED_MOVE.get(jobLinkDir);
}
- newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
+ String newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
- log.info("moving " + oldPath + " to " + newPath);
- fileSystem.move(oldPath, newPath);
+ log.info("moving " + jobPath + " to " + newPath);
+ fileSystem.move(jobPath, newPath);
// mark the job has finished if done or failed too many
// times
@@ -332,8 +366,7 @@
} catch (InterruptedException e) {
log.info("exception catch", e);
// TODO 20100611 bleny manage exception
- }
-
+ }
}
@Override
@@ -367,7 +400,8 @@
} else if ("scheduled".equals(initialStrategy)) {
activeScheduledActivityStrategy();
} else {
- log.error("wrong config directive " + initialStrategy);
+ log.error("wrong config directive for initial strategy" +
+ initialStrategy);
activeNoActivityStrategy();
}
1
0
r81 - in trunk/diswork-daemon/src: main/java/org/nuiton/diswork/daemon test/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 17 Jun '10
by bleny@users.nuiton.org 17 Jun '10
17 Jun '10
Author: bleny
Date: 2010-06-17 11:00:26 +0200 (Thu, 17 Jun 2010)
New Revision: 81
Url: http://nuiton.org/repositories/revision/diswork/81
Log:
bugfix dans la concurrence avec test
Added:
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-17 08:08:19 UTC (rev 80)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-17 09:00:26 UTC (rev 81)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import java.lang.management.ManagementFactory;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-17 08:08:19 UTC (rev 80)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-17 09:00:26 UTC (rev 81)
@@ -142,6 +142,10 @@
return getOptionAsInt("diswork.workers_number");
}
+ public void setNumberOfWorkers(Integer number) {
+ setOption("diswork.workers_number", number.toString());
+ }
+
public String getActivityStrategy() {
return getOption("diswork.activity_strategy");
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 08:08:19 UTC (rev 80)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 09:00:26 UTC (rev 81)
@@ -68,6 +68,9 @@
RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING);
RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_1_RUNNING);
RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_2_RUNNING);
+ RUNNING_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.TODO_RUNNING);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_1_RUNNING);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_2_RUNNING);
FAILED_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1);
FAILED_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2);
@@ -254,12 +257,14 @@
DisworkDaemon.FAILED_1_RUNNING,
DisworkDaemon.TODO_RUNNING
};
+
+ // FIXME 20100617 bleny stop loops when found
for (String path : runningJobsDirs) {
String oldName = getFistJobName(path);
if (oldName != null) {
Long linkAge = System.currentTimeMillis()
- Long.parseLong(oldName);
- if (linkAge <= MAX_JOB_RUNNING_TIME) {
+ if (linkAge > MAX_JOB_RUNNING_TIME) {
log.info("taking old job (age = " + linkAge + ")");
jobLinkDir = path;
jobLinkName = oldName;
@@ -351,14 +356,6 @@
public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) {
this.fileSystem = fileSystem;
this.config = config;
-
- log.info("will start " + config.getNumberOfWorkers() + " workers");
- for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) {
- Worker worker = new Worker();
- worker.manager = this;
- worker.start();
- workers.add(worker);
- }
String initialStrategy = config.getActivityStrategy();
if ( "none".equals(initialStrategy)) {
@@ -373,6 +370,14 @@
log.error("wrong config directive " + initialStrategy);
activeNoActivityStrategy();
}
+
+ log.info("will start " + config.getNumberOfWorkers() + " workers");
+ for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) {
+ Worker worker = new Worker();
+ worker.manager = this;
+ worker.start();
+ workers.add(worker);
+ }
}
Added: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java (rev 0)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-06-17 09:00:26 UTC (rev 81)
@@ -0,0 +1,21 @@
+package org.nuiton.diswork.daemon;
+
+import java.io.InputStream;
+
+import org.junit.Before;
+
+public class DisworkDaemonConcurrencyTest extends DisworkDaemonTest {
+
+ @Before
+ public void setUp() throws Exception {
+ DisworkConfig config = DisworkConfig.newConfig();
+ port += 1;
+ config.setUsedPort(port);
+ config.setNumberOfWorkers(32);
+ daemon = new DisworkDaemon(config);
+ InputStream application = ClassLoader.getSystemResourceAsStream("fake-app-1.0.zip");
+ daemon.submitApplication("fake-app", "1.0", application);
+ }
+
+
+}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-17 08:08:19 UTC (rev 80)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-17 09:00:26 UTC (rev 81)
@@ -143,7 +143,7 @@
Map<String, Integer> stats = daemon.getGlobalStats();
// deamon should read 3 stats : 1 OS, 1 architecture and the number
// of processors
- assertEquals(3, stats);
+ assertEquals(3, stats.size());
}
}
1
0
r80 - in trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs: . storage
by bleny@users.nuiton.org 17 Jun '10
by bleny@users.nuiton.org 17 Jun '10
17 Jun '10
Author: bleny
Date: 2010-06-17 10:08:19 +0200 (Thu, 17 Jun 2010)
New Revision: 80
Url: http://nuiton.org/repositories/revision/diswork/80
Log:
- bug dans les tests kad
Modified:
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-17 07:53:30 UTC (rev 79)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-17 08:08:19 UTC (rev 80)
@@ -27,7 +27,7 @@
// finally, initiate the fileSystem
DisworkFileSystemConfig disworkConfig1 =
DisworkFileSystemConfig.newKademliaDisworkConfig();
- bootstrapPort = disworkConfig1.getUsedPort() + 1;
+ bootstrapPort = disworkConfig1.getUsedPort();
fileSystem = new DisworkFileSystem(disworkConfig1);
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-06-17 07:53:30 UTC (rev 79)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-06-17 08:08:19 UTC (rev 80)
@@ -22,7 +22,7 @@
map1 = new KademliaDisworkMap(config1);
DisworkFileSystemConfig config2 = DisworkFileSystemConfig
- .newKademliaDisworkConfig(config1.getUsedPort()+1);
+ .newKademliaDisworkConfig(config1.getUsedPort());
map2 = new KademliaDisworkMap(config2);
}
1
0
Author: bleny
Date: 2010-06-17 09:53:30 +0200 (Thu, 17 Jun 2010)
New Revision: 79
Url: http://nuiton.org/repositories/revision/diswork/79
Log:
config log4j pour les executables
Added:
trunk/diswork-daemon/src/main/resources/
trunk/diswork-daemon/src/main/resources/log4j.properties
Added: trunk/diswork-daemon/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-daemon/src/main/resources/log4j.properties (rev 0)
+++ trunk/diswork-daemon/src/main/resources/log4j.properties 2010-06-17 07:53:30 UTC (rev 79)
@@ -0,0 +1,9 @@
+# Global logging configuration
+log4j.rootLogger=WARN, stdout
+# Console output...
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+# package level
+log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
+log4j.logger.org.nuiton.diswork.daemon=INFO
\ No newline at end of file
1
0
r78 - in trunk: diswork-daemon diswork-daemon/src/main/java/org/nuiton/diswork/daemon diswork-fs/src/main/java/org/nuiton/diswork/fs diswork-fs/src/main/java/org/nuiton/diswork/fs/storage
by bleny@users.nuiton.org 17 Jun '10
by bleny@users.nuiton.org 17 Jun '10
17 Jun '10
Author: bleny
Date: 2010-06-17 09:51:19 +0200 (Thu, 17 Jun 2010)
New Revision: 78
Url: http://nuiton.org/repositories/revision/diswork/78
Log:
DaemonRunner et SimpleClient
Added:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
Modified:
trunk/diswork-daemon/
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java
Property changes on: trunk/diswork-daemon
___________________________________________________________________
Modified: svn:ignore
- target
+ target
.classpath
.settings
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-16 16:02:13 UTC (rev 77)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -24,11 +24,9 @@
*/
package org.nuiton.diswork.daemon;
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.lang.management.RuntimeMXBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.nuiton.diswork.fs.DisworkFileSystemConfig;
/**
*
@@ -41,20 +39,18 @@
/**
* @param args
*/
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
- OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
- System.out.println(os.getArch());
- System.out.println(os.getAvailableProcessors());
- System.out.println(os.getName());
- System.out.println(os.getVersion());
-
- RuntimeMXBean run = ManagementFactory.getRuntimeMXBean();
- System.out.println(run.getUptime());
-
- // consider args
-
- // DisworkDaemon node = new DisworkDaemon(config);
-
+ DisworkConfig config = new DisworkConfig();
+ if (args.length == 0) {
+ config.setFileSystemConfig(
+ DisworkFileSystemConfig.newKademliaDisworkConfig());
+ } else if (args.length == 2){
+ config.setFileSystemConfig(
+ DisworkFileSystemConfig.newKademliaDisworkConfig(args[0],
+ Integer.parseInt(args[1])));
+ }
+ config.setActivityStrategy("unlimited");
+ DisworkDaemon daemon = new DisworkDaemon(config);
}
}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -0,0 +1,90 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.nuiton.diswork.fs.DisworkFileSystemConfig;
+
+/**
+ *
+ * @author bleny
+ */
+public class DisworkSimpleClient {
+
+ private static final Log log = LogFactory.getLog(DisworkSimpleClient.class);
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ DisworkConfig config = new DisworkConfig();
+ config.setFileSystemConfig(
+ DisworkFileSystemConfig.newKademliaDisworkConfig(args[0],
+ Integer.parseInt(args[1])));
+ config.setActivityStrategy("none");
+ config.setUsedPort(30000);
+ DisworkDaemon daemon = new DisworkDaemon(config);
+
+ // prompt the user to enter their name
+
+ // open up standard input
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+ String userEntry = "";
+ do {
+ System.out.print("diswork: ");
+ userEntry = br.readLine();
+
+ if ("stats".equals(userEntry)) {
+ Map<String, Integer> stats = daemon.getGlobalStats();
+ for (String key : stats.keySet()) {
+ System.out.println(key + " => " + stats.get(key));
+ }
+ } else if (!"quit".equals(userEntry)) {
+ JobDescription job = new JobDescription();
+ job.setStandardOutput("output.txt");
+ job.addOutput("output.txt");
+ job.setCommandLine(userEntry);
+ daemon.submitJob(job);
+
+ while(! daemon.isFinished(job)) {
+ Thread.sleep(5 * 1000);
+ }
+
+ System.out.println(IOUtils.toString(daemon.getResults(job).get("output.txt")));
+ }
+
+ } while (!userEntry.equals("quit"));
+
+ }
+}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-16 16:02:13 UTC (rev 77)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -294,7 +294,7 @@
jsdlNamespace);
Element applicationName = application.getChild("ApplicationName",
jsdlNamespace);
- if (application != null) {
+ if (applicationName != null) {
Element applicationVersion = application.getChild
("ApplicationVersion", jsdlNamespace);
result.setApplication(applicationName.getText(),
@@ -315,7 +315,7 @@
}
Element output = POSIXApplication.getChild("Output",
jsdlPosixNamespace);
- if (input != null) {
+ if (output != null) {
result.setStandardOutput(output.getText());
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-16 16:02:13 UTC (rev 77)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -194,10 +194,12 @@
// dump the standard output in a file
String standardOutputFileName = jobDescription.getStandardOutput();
+ log.info("standardOutputFileName is " + standardOutputFileName);
if (standardOutputFileName != null) {
- OutputStream output = new FileOutputStream(
- new File(jobDir, standardOutputFileName));
- IOUtils.copy(job.getInputStream(), output);
+ File outputFile = new File(jobDir, standardOutputFileName);
+ OutputStream output = new FileOutputStream(outputFile);
+ log.info("writing standard output in " + outputFile);
+ IOUtils.copy(job.getInputStream(), output);
}
log.info("job returned " + exitValue);
@@ -205,6 +207,7 @@
// output file staging
for (String fileName : jobDescription.getStagingOutput()) {
File localCopy = new File(jobDir, fileName);
+ // FIXME 20100616 bleny may not exists if job has fail
InputStream localCopyStream = new FileInputStream(localCopy);
String filePath = jobPath + "/" + fileName;
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-16 16:02:13 UTC (rev 77)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -181,14 +181,19 @@
public static DisworkFileSystemConfig
newKademliaDisworkConfig (Integer bootstrapPort) {
+ return newKademliaDisworkConfig(getIp(), bootstrapPort);
+ }
+
+ public static DisworkFileSystemConfig
+ newKademliaDisworkConfig (String bootstrapIp,
+ Integer bootstrapPort) {
DisworkFileSystemConfig result = new DisworkFileSystemConfig();
String port = getPort().toString();
- String ip = getIp();
result.setOption("diswork.fs.map_type", "kademlia");
result.setOption("diswork.fs.use_port", port);
if (bootstrapPort != null) {
result.setOption("diswork.fs.bootstrap.port", bootstrapPort.toString());
- result.setOption("diswork.fs.bootstrap.ip", ip);
+ result.setOption("diswork.fs.bootstrap.ip", bootstrapIp);
}
return result;
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java 2010-06-16 16:02:13 UTC (rev 77)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java 2010-06-17 07:51:19 UTC (rev 78)
@@ -50,6 +50,7 @@
public KademliaDisworkMap(DisworkFileSystemConfig config) throws IOException {
+ log.info("booting on port " + config.getUsedPort());
kad = new Kademlia(Identifier.randomIdentifier(), config.getUsedPort());
if (config.getBootstrapIp() != null) {
1
0
Author: bleny
Date: 2010-06-16 18:02:13 +0200 (Wed, 16 Jun 2010)
New Revision: 77
Url: http://nuiton.org/repositories/revision/diswork/77
Log:
menage eclipse (boulette)
Removed:
trunk/diswork-daemon/.classpath
trunk/diswork-daemon/.settings/
Deleted: trunk/diswork-daemon/.classpath
===================================================================
--- trunk/diswork-daemon/.classpath 2010-06-16 15:55:53 UTC (rev 76)
+++ trunk/diswork-daemon/.classpath 2010-06-16 16:02:13 UTC (rev 77)
@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="target/classes"/>
-</classpath>
1
0
r76 - in trunk/diswork-daemon: . .settings src/main/java/org/nuiton/diswork/daemon src/test/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 16 Jun '10
by bleny@users.nuiton.org 16 Jun '10
16 Jun '10
Author: bleny
Date: 2010-06-16 17:55:53 +0200 (Wed, 16 Jun 2010)
New Revision: 76
Url: http://nuiton.org/repositories/revision/diswork/76
Log:
statistiques globales, parsing JSDL, activity strategy, config, documentation
Added:
trunk/diswork-daemon/.classpath
trunk/diswork-daemon/.project
trunk/diswork-daemon/.settings/
trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs
trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
Added: trunk/diswork-daemon/.classpath
===================================================================
--- trunk/diswork-daemon/.classpath (rev 0)
+++ trunk/diswork-daemon/.classpath 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
Added: trunk/diswork-daemon/.project
===================================================================
--- trunk/diswork-daemon/.project (rev 0)
+++ trunk/diswork-daemon/.project 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>diswork-daemon</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
Added: trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs
===================================================================
--- trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs (rev 0)
+++ trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,6 @@
+#Tue Jun 08 14:19:53 CEST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
Added: trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs
===================================================================
--- trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs (rev 0)
+++ trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,9 @@
+#Mon Jun 07 18:20:47 CEST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,89 @@
+package org.nuiton.diswork.daemon;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public interface ActivityStrategy {
+
+ /** use this strategy to never run a job */
+ public static class NoActivity implements ActivityStrategy {
+ @Override
+ public boolean canWork() {
+ return false;
+ }
+ }
+
+ /** use this strategy to always run a job */
+ public static class UnlimitedActivity implements ActivityStrategy {
+ @Override
+ public boolean canWork() {
+ return true;
+ }
+ }
+
+ /** use this strategy to run a job only if load average is low */
+ public static class LimitedActivity implements ActivityStrategy {
+
+ private static final Log log = LogFactory.getLog(LimitedActivity.class);
+
+ protected static class LoadAverageMonitoring extends Thread {
+
+ OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+
+ Double loadAverageNow = 1.0;
+ Double loadAverage5MinutesBefore = 1.0;
+ Double loadAverage10MinutesBefore = 1.0;
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(5 * 60 * 1000); // 5 min
+ } catch (InterruptedException e) {
+ // TODO 20100615 bleny Auto-generated catch block
+ log.info("exception catch", e);
+ e.printStackTrace();
+ }
+ loadAverage10MinutesBefore = loadAverage5MinutesBefore;
+ loadAverage5MinutesBefore = loadAverageNow;
+ loadAverageNow = os.getSystemLoadAverage();
+ log.info("load averages : " + loadAverageNow + " " +
+ loadAverage5MinutesBefore + " "
+ + loadAverage10MinutesBefore);
+ }
+ }
+ }
+
+ LoadAverageMonitoring monitoring;
+
+ public LimitedActivity() {
+ monitoring = new LoadAverageMonitoring();
+ monitoring.start();
+ }
+
+ @Override
+ public boolean canWork() {
+ boolean canWork = monitoring.loadAverageNow < 1.0
+ && monitoring.loadAverage5MinutesBefore < 1.0
+ && monitoring.loadAverage10MinutesBefore < 1.0;
+ return canWork;
+ }
+ }
+
+ /** use this strategy to run a job only at fixed times of the week */
+ public static class ScheduledActivity implements ActivityStrategy {
+
+ @Override
+ public boolean canWork() {
+ // TODO 20100615 bleny Auto-generated method stub
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+ }
+
+
+ /** return true if a job can be run */
+ boolean canWork();
+}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -24,21 +24,68 @@
*/
package org.nuiton.diswork.daemon;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
import org.nuiton.util.ApplicationConfig;
+/**
+ *
+ * <dl>
+ * <dt>diswork.workers_number</dt>
+ * <dd>The number of jobs the daemon can run at the same time. Should be set
+ * to a value that consider the number of processors, core and RAM
+ * available. By default, it's the number processors available to the
+ * JVM. Set it to 0 will disable job-processing.</dd>
+ * <dt>diswork.temp_directory</dt>
+ * <dd>Diswork need a temporary directory to store temporary data for each
+ * jobs. By default, the temp dir of the OS is used (ie "/tmp/diswork"
+ * under Linux).</dd>
+ * <dt>diswork.activity_strategy</dt>
+ * <dd>This is the way the daemon is started, different values are
+ * available:
+ * <dl>
+ * <dt>none</dt>
+ * <dd>never try to do a job</dd>
+ * <dt>unlimited</dt>
+ * <dd>always try to job, whatever the cost</dd>
+ * <dt>limited</dt>
+ * <dd>run a job only if hardware resources are available (based
+ * on the system load average)</dd>
+ * <dt>scheduled</dt>
+ * <dd>run a job only at fixed time of the week (for example,
+ * nights, week-end, etc.). It needs to define a pattern.</dd>
+ * </dl>
+ * </dd>
+ * </dl>
+ *
+ * @author bleny
+ */
public class DisworkConfig extends ApplicationConfig {
protected DisworkFileSystemConfig fileSystemConfig;
public DisworkConfig() {
setConfigFileName("diswork.config");
+
+ Integer availableProcessors = ManagementFactory
+ . getOperatingSystemMXBean()
+ . getAvailableProcessors();
+ setDefaultOption("diswork.workers_number", availableProcessors.toString());
+
+ setDefaultOption("diswork.temp_directory",
+ System.getProperty("java.io.tmpdir")
+ + File.separator + "diswork");
+
+ setDefaultOption("diswork.activity_strategy", "unlimited");
+
+ // if no total_uptime saved, consider daemon has never run
+ setDefaultOption("diswork.total_uptime", "0");
}
public String getTempDirectory() {
- return System.getProperty("java.io.tmpdir",
- System.getProperty("user.dir",
- ".")) + "/diswork";
+ return getOption("diswork.temp_directory");
}
public static DisworkConfig newConfig() {
@@ -90,5 +137,35 @@
public void setFileSystemConfig(DisworkFileSystemConfig fileSystemConfig) {
this.fileSystemConfig = fileSystemConfig;
}
+
+ public Integer getNumberOfWorkers() {
+ return getOptionAsInt("diswork.workers_number");
+ }
+
+ public String getActivityStrategy() {
+ return getOption("diswork.activity_strategy");
+ }
+
+ public void setActivityStrategy(String activityStrategyName) {
+ setOption("diswork.activity_strategy", activityStrategyName);
+ }
+
+ public Long getTotalUptime() {
+ String upTime = getOption("diswork.total_uptime");
+ return Long.parseLong(upTime);
+ }
+
+ public void setTotalUptime(Long upTime) {
+ setOption("diswork.total_uptime", upTime.toString());
+ }
+
+ public void setFirstRunTime(Long time) {
+ setOption("diswork.first_run_time", time.toString());
+ }
+ public Long getFirstRunTime() {
+ String firstRunTime = getOption("diswork.first_run_time");
+ return Long.parseLong(firstRunTime);
+ }
+
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -28,6 +28,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
import java.net.UnknownHostException;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -41,6 +43,7 @@
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
+import org.nuiton.util.FileUtil;
/**
*
@@ -55,16 +58,6 @@
private static final Log log = LogFactory.getLog(DisworkDaemon.class);
- protected DisworkFileSystem fileSystem;
-
- protected DisworkConfig config;
-
- /** owner id for this node */
- protected String ownerId;
-
- /** path to owned directory on fileSystem */
- protected String homeDir;
-
/** contains applications */
protected static final String BIN = "/bin";
@@ -94,9 +87,6 @@
/** a place where are all user-directories */
protected static final String HOME = "/home";
-
- /** worker-manager will make the daemon accomplish jobs */
- protected WorkersManager workers;
/** in a job directory, the place where the JSDL must be placed */
protected static final String JSDL_PATH = ".diswork/job.jsdl";
@@ -104,6 +94,28 @@
/** in a job directory, the place where the log must be placed */
protected static final String LOG_PATH = ".diswork/job.log";
+ /** in a home directory, the place where the hardware info must be placed */
+ protected static final String HARDINFO_PATH = "hardinfo";
+
+
+ /** the distributed file system where jobs, data and results are stored */
+ protected DisworkFileSystem fileSystem;
+
+ /** provide the configuration data about the daemon */
+ protected DisworkConfig config;
+
+ /** time when the deamon started this time, used for total uptime stat */
+ protected Long sessionStartTime;
+
+ /** owner id for this node */
+ protected String ownerId;
+
+ /** path to owned directory on fileSystem */
+ protected String homeDir;
+
+ /** worker-manager will make the daemon accomplish jobs */
+ protected WorkersManager workers;
+
public DisworkDaemon(DisworkConfig config) throws DisworkException {
this.config = config;
@@ -122,39 +134,79 @@
throw new DisworkException("booting diswork file system failed", e);
}
-
- ownerId = config.getOwnerId(); // get job owner id from config
-
- if (ownerId == null) {
+ ownerId = config.getOwnerId(); // get job owner id from config
+
+ if (ownerId == null) { // first time running the daemon
log.info("can't find owner id, generating a new one");
- // generate a new one by cheking if home dir exists
- ownerId = System.getProperty("user.name", "anonymous");
+
// check home dir do not exists
+ try {
+ Random random = new Random();
+ // generate a new one by cheking if home dir exists
+ String simpleName = System.getProperty("user.name", "anonymous");
+ ownerId = simpleName;
+ boolean alreadyExists = fileSystem.exists(HOME + "/" + ownerId);
+
+ // if simpleName is already taken, try simpleName + random
+ while (alreadyExists) {
+ alreadyExists = fileSystem.exists(HOME + "/" + ownerId);
+ ownerId = simpleName + random.nextInt();
+ }
+ homeDir = HOME + "/" + ownerId;
+ if (!fileSystem.exists(homeDir)) {
+ fileSystem.createDirectory(homeDir);
+ }
+ } catch (ConcurrentModificationException e) {
+ log.info("can't create home dir", e);
+ throw new DisworkException("can't create home dir", e);
+ } catch (IOException e) {
+ log.info("can't create home dir", e);
+ throw new DisworkException("can't create home dir", e);
+ }
+
config.setOwnerId(ownerId);
-
+
+ config.setFirstRunTime(System.currentTimeMillis());
+
// config.saveForUser();
}
-
+
log.info("owner id is " + ownerId);
-
+
+ // check if config implies to run a worker
+ if (config.getNumberOfWorkers() >= 0) {
+ workers = new WorkersManager(fileSystem, config);
+ } else {
+ log.info("worker manager disabled");
+ }
+
+ sessionStartTime = System.currentTimeMillis();
+
+ // writing hardware info to homeDir
try {
- homeDir = HOME + "/" + ownerId;
- if (!fileSystem.exists(homeDir)) {
- fileSystem.createDirectory(homeDir);
- }
+ OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ String hardinfos = os.getName() + "\n" + os.getArch() + "\n" +
+ os.getAvailableProcessors();
+
+ // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos
+
+ fileSystem.write(homeDir + "/" + HARDINFO_PATH,
+ IOUtils.toInputStream(hardinfos));
+ log.info("writing hardware infos " + hardinfos);
} catch (ConcurrentModificationException e) {
- log.info("can't create home dir", e);
- throw new DisworkException("can't create home dir", e);
+ log.info("can't write hardware info", e);
+ throw new DisworkException("can't write hardware info", e);
} catch (IOException e) {
- log.info("can't create home dir", e);
- throw new DisworkException("can't create home dir", e);
+ log.info("can't write hardware info", e);
+ throw new DisworkException("can't write hardware info", e);
}
-
- // check if config implies to run a worker
- workers = new WorkersManager(fileSystem, config);
- workers.start();
}
+ /**
+ * Create all base directories on the File System
+ * @throws ConcurrentModificationException
+ * @throws IOException
+ */
protected void initFileSystem() throws ConcurrentModificationException,
IOException {
String[] directories = { TODO, TODO_RUNNING, FAILED_1, FAILED_1_RUNNING,
@@ -170,6 +222,8 @@
}
}
+ /* *** methods for defining some usual paths *** */
+
/**
* given a name and a version for an application, returns the path where
* application data can be found on diswork file system.
@@ -188,6 +242,27 @@
}
/**
+ * Given a job description, returns the place on disworkFS where all data
+ * for this jobs should be stored
+ * @param jobDescription
+ * @return an absolute path
+ */
+ protected String getPathForJob(JobDescription jobDescription) {
+ return getPathForJob(jobDescription.getJobId());
+ }
+
+ /**
+ * Given a job id, returns the place on disworkFS where all data
+ * for this jobs should be stored
+ * @param jobDescription
+ * @return a path
+ */
+ protected String getPathForJob(String jobId) {
+ // all jobs are stored in home dir
+ return homeDir + "/" + jobId;
+ }
+
+ /**
* every-time a link to a job is created or modified in the job, his name
* has to be generated by this method
* @return the name to use for a link
@@ -196,9 +271,22 @@
return ((Long) System.currentTimeMillis()).toString();
}
- public void submitApplication(String applicationName,
- String applicationVersion,
- InputStream applicationData) throws DisworkException {
+ /**
+ * Provide an application to all nodes. Once provided, all nodes will be
+ * able to perform a job with this application
+ *
+ * An application can be uploaded in different version. If a given
+ * application in the given version is already on the File System, nothing
+ * is done : the application file is <strong>NOT</strong> replaced.
+ *
+ * @param applicationName the name of the application
+ * @param applicationVersion the version of the application
+ * @param applicationData an InputStream on the application .zip file
+ * @throws DisworkException if an error occurs while uploading the file
+ */
+ public void submitApplication(String applicationName,
+ String applicationVersion, InputStream applicationData)
+ throws DisworkException {
// the place where dependency will be stored
String path = getPathForDependency(applicationName, applicationVersion);
@@ -221,108 +309,87 @@
throw new DisworkException("unable to write", e);
}
}
-
- /**
- * Given a job description, returns the place on disworkFS where all data
- * for this jobs should be stored
- * @param jobDescription
- * @return a path
- */
- protected String getJobPath(JobDescription jobDescription) {
- return getJobPath(jobDescription.getJobId());
- }
- /**
- * Given a job description, returns the place on disworkFS where all data
- * for this jobs should be stored
- * @param jobDescription
- * @return a path
- */
- protected String getJobPath(String jobId) {
- // all jobs are stored in home dir
- return homeDir + "/" + jobId;
- }
-
- /**
- *
- */
- public JobDescription newJob() throws IOException {
- Random random = new Random();
-
- boolean alreadyExists = true;
- String newJobIntendifier = null;
- while (alreadyExists) {
- Integer randomInteger = random.nextInt();
- newJobIntendifier = "job_" + randomInteger.toString();
- alreadyExists = fileSystem.exists(getJobPath(newJobIntendifier));
- }
-
- // create both job path and sub-directory .diswork
- fileSystem.createDirectories(getJobPath(newJobIntendifier) + "/" + ".diswork");
- log.info("created new job " + newJobIntendifier);
- return new JobDescription(newJobIntendifier);
- }
-
public void submitJob(JobDescription jobDescription) throws DisworkException {
- submitJob(jobDescription, new HashMap<String, InputStream>());
- }
-
- public void submitJob(JobDescription jobDescription,
- Map<String, InputStream> inputFiles) throws DisworkException {
- // check dependencies, throw exception
-
- if (inputFiles.size() + jobDescription.getStagingInputUrls().size()
+ if (jobDescription.getInputData().size() + jobDescription.getStagingInputUrls().size()
< jobDescription.getStagingInput().size()) {
// dependencies are missing
}
try {
- String dependencyPath =
- getPathForDependency(jobDescription.getApplicationName(),
- jobDescription.getApplicationVersion());
- log.info("looking for " + dependencyPath);
- if (!fileSystem.exists(dependencyPath)) {
- throw new DisworkException("job require a dependency " +
- jobDescription.getApplicationName() + "-" +
- jobDescription.getApplicationVersion() + " that is " +
- "not available");
+ // trying to put the job in a new directory of home
+ Random random = new Random();
+ boolean alreadyExists = true;
+ String newJobIntendifier = null;
+ while (alreadyExists) {
+ Integer randomInteger = random.nextInt();
+ newJobIntendifier = "job_" + randomInteger.toString();
+ alreadyExists = fileSystem.exists(getPathForJob(newJobIntendifier));
}
+
+ jobDescription.setJobId(newJobIntendifier);
- String jobDir = getJobPath(jobDescription);
+ // create both job path and sub-directory .diswork
+ fileSystem.createDirectories(
+ getPathForJob(jobDescription) + "/" + ".diswork");
+
+ if (jobDescription.applicationName != null) {
+ String dependencyPath = getPathForDependency(
+ jobDescription.getApplicationName(),
+ jobDescription.getApplicationVersion());
+ log.info("looking for " + dependencyPath);
+
+ if (!fileSystem.exists(dependencyPath)) {
+ throw new DisworkException("job require a dependency " +
+ jobDescription.getApplicationName() + "-" +
+ jobDescription.getApplicationVersion()
+ + " that is not available");
+ }
+ } else {
+ log.info("no dependency specified for " + jobDescription);
+ }
+ String jobDir = getPathForJob(jobDescription);
+
if(!fileSystem.exists(jobDir)) {
// strange !
}
+ // creating an empty log file
+ log.info("creating log file " + jobDir + "/" + LOG_PATH);
fileSystem.write(jobDir + "/" + LOG_PATH, IOUtils.toInputStream(""));
-
+
+ // writing the JSDL file
InputStream jobJSDL = IOUtils.toInputStream(jobDescription.toJSDL());
fileSystem.write(jobDir + "/" + JSDL_PATH, jobJSDL);
// file staging
- for (String fileName : inputFiles.keySet()) {
- fileSystem.write(jobDir + "/" + fileName, inputFiles.get(fileName));
+ for (String fileName : jobDescription.getInputData().keySet()) {
+ fileSystem.write(jobDir + "/" + fileName,
+ jobDescription.getInputData().get(fileName));
}
-
+
+ // FIXME 20100609 bleny may throws exception if jobs are proposed
+ // at a same time
+
// propose job
String linkName = newJobLinkName();
-
- // FIXME 20100609 bleny may throws exception if jobs are proposed
- // at a same time
fileSystem.createSymbolicLink(TODO + "/" + linkName, jobDir);
+ log.info("job submited");
+
} catch (IOException e) {
log.error("file system error", e);
throw new DisworkFileSystemException("file system error", e);
}
}
- public boolean checkLogContains(JobDescription job,
+ protected boolean checkLogContains(JobDescription job,
String pattern) throws DisworkException {
try {
- String jobPath = getJobPath(job);
+ String jobPath = getPathForJob(job);
List<?> entries = IOUtils.readLines(fileSystem.read(jobPath + "/" + LOG_PATH));
return entries.contains(pattern);
} catch (FileNotFoundException e) {
@@ -345,28 +412,107 @@
public boolean isFailed(JobDescription job) throws DisworkException {
return isFinished(job) && !isSuccessful(job);
}
+
+ public Map<String, InputStream> getResults(JobDescription job)
+ throws DisworkException {
+ if (isFinished(job)) {
+ Map<String, InputStream> results = new HashMap<String, InputStream>();
+ for (String fileName : job.getStagingOutput()) {
+ String jobPath = getPathForJob(job);
+ try {
+ InputStream result = fileSystem.read(jobPath + "/" + fileName);
+ results.put(fileName, result);
+ } catch (FileNotFoundException e) {
+ throw new DisworkException("an expected file is missing", e);
+ } catch (IOException e) {
+ log.info("file system error ", e);
+ throw new DisworkException("file system error ", e);
+ }
+ }
+ return results;
+ } else {
+ throw new DisworkException("can't get results for unfinished job "
+ + job);
+ }
+ }
+
+ /** close the daemon (stop all workers)
+ * update statistices and delete all temporary data
+ */
@Override
public void close() throws IOException {
workers.stop();
+
+ // updating total uptime statistic
+ Long totalUptime = getTotalUptime();
+ log.info("saving total uptime: " + totalUptime);
+ config.setTotalUptime(totalUptime);
+ //config.saveForUser();
+
fileSystem.close();
+
+ FileUtil.deleteRecursively(config.getTempDirectory());
}
- public Map<String, InputStream> getResults(JobDescription job)
- throws DisworkException {
- Map<String, InputStream> results = new HashMap<String, InputStream>();
- for (String fileName : job.getStagingOutput()) {
- String jobPath = getJobPath(job);
- try {
- InputStream result = fileSystem.read(jobPath + "/" + fileName);
- results.put(fileName, result);
- } catch (FileNotFoundException e) {
- throw new DisworkException("an expected file is missing", e);
- } catch (IOException e) {
- log.info("file system error ", e);
- throw new DisworkException("file system error ", e);
+ /* *** methods about statistics *** */
+
+ protected Long getTotalUptime() {
+ Long currentTime = System.currentTimeMillis();
+ Long sessionUptime = currentTime - sessionStartTime;
+ Long totalUptime = config.getTotalUptime() + sessionUptime;
+ return totalUptime;
+ }
+
+ /**
+ * return a ratio of the total uptime on the time since the first demon
+ * run. For exemple, if daemon was up 2 hours and was installed 4 hours ago
+ * this method return 0.5.
+ * @return
+ */
+ public Double getUptimeRatio() {
+ Double uptimeRatio = (double) getTotalUptime().longValue()
+ / ((double) System.currentTimeMillis()
+ - (double) config.getFirstRunTime().longValue());
+ return uptimeRatio;
+ }
+
+ /** get informations on hardware available on the global Diswork system
+ *
+ * @return
+ * @throws DisworkException
+ */
+ public Map<String, Integer> getGlobalStats() throws DisworkException {
+ try {
+ Map<String, Integer> stats = new HashMap<String, Integer>();
+ stats.put("available_processors", 0);
+
+ List<String> homeDirs = fileSystem.readDirectory(HOME);
+ for (String homeDir : homeDirs) {
+ String hardinfo = IOUtils.toString(fileSystem.read(
+ HOME + "/" + homeDir + "/" + HARDINFO_PATH));
+ String[] infos = hardinfo.split("\n");
+
+ // reading the OS name
+ if (!stats.containsKey(infos[0])) {
+ stats.put(infos[0], 0);
+ }
+ // reading the architecture
+ stats.put(infos[0], stats.get(infos[0]) + 1);
+ if (!stats.containsKey(infos[1])) {
+ stats.put(infos[1], 0);
+ }
+ stats.put(infos[1], stats.get(infos[1]) + 1);
+
+ // reading the number of processors
+ stats.put("available_processors",
+ stats.get("available_processors") +
+ Integer.parseInt(infos[2]));
}
+ return stats;
+ } catch (IOException e) {
+ log.info("file system error ", e);
+ throw new DisworkException("file system error ", e);
}
- return results;
}
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -24,6 +24,9 @@
*/
package org.nuiton.diswork.daemon;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,12 +37,21 @@
public class DisworkDaemonRunner {
private static final Log log = LogFactory.getLog(DisworkDaemonRunner.class);
-
+
/**
* @param args
*/
public static void main(String[] args) {
+ OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ System.out.println(os.getArch());
+ System.out.println(os.getAvailableProcessors());
+ System.out.println(os.getName());
+ System.out.println(os.getVersion());
+
+ RuntimeMXBean run = ManagementFactory.getRuntimeMXBean();
+ System.out.println(run.getUptime());
+
// consider args
// DisworkDaemon node = new DisworkDaemon(config);
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -24,46 +24,86 @@
*/
package org.nuiton.diswork.daemon;
-import java.io.Serializable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jdom.Attribute;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.Namespace;
+import org.jdom.input.SAXBuilder;
+
/**
*
+ * When writing your command line, you should use provided tokens. Token
+ * are piece of command that will be replaced by the job-executor when
+ * needed.
*
+ * <dl>
+ * <dt>%java</dt>
+ * </dd>will be replaced by the actual path of java executable
+ * into the JRE</dd>
+ * </dl>
*
+ * This class provides methods to read and parse those data to an XML file
+ * following (as far as possible), the Job Submission Description Language
+ * Specification.
+ *
+ * @see http://en.wikipedia.org/wiki/Job_Submission_Description_Language
+ * @see http://www.gridforum.org/documents/GFD.56.pdf
+ *
* @author bleny
*/
-public class JobDescription implements Serializable {
+public class JobDescription {
- private static final long serialVersionUID = -8493700934802808925L;
-
- /** an id for diswork */
+ private static final Log log = LogFactory.getLog(JobDescription.class);
+
+ /** an id for diswork, not stored in the JSDL file */
protected String jobId;
+ /** a name for the job, it's a convenience for the user */
protected String jobName;
+ /** the name of the application needed for this job
+ * can be null if no application is needed for complete this job
+ */
protected String applicationName;
+
+ /** the version of the application
+ * can't be null applicationName is set
+ */
protected String applicationVersion;
+ /** the command line to execute this job */
protected String commandLine;
/** all files expected at the beginning of the job */
- protected List<String> stagingInput = new ArrayList<String>();
+ protected List<String> input = new ArrayList<String>();
/** all files expected at the end of the job */
- protected List<String> stagingOutput = new ArrayList<String>();
+ protected List<String> output = new ArrayList<String>();
- /** the name of a file and the URI where to get it */
- protected Map<String, URL> stagingInputUrls = new HashMap<String, URL>();
+ /** the name of some input files and the URI where to get it */
+ protected Map<String, URL> inputUrls = new HashMap<String, URL>();
+ /** */
+ protected Map<String, InputStream> inputData = new HashMap<String, InputStream>();
+
/** file where to read the standard input, may be null */
protected String standardInput;
- /** file where to write the standard output */
+ /** file where to write the standard output, may be null */
protected String standardOutput;
/** TOKENS are piece of string you can use for writing command lines */
@@ -85,9 +125,15 @@
this.jobId = jobId;
}
+ public JobDescription() {}
+
public String getJobId() {
return jobId;
}
+
+ protected void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
public String getCommandLine() {
String result = commandLine;
@@ -108,19 +154,39 @@
public String getApplicationName() {
return applicationName;
}
+
+ /**
+ * this method is protected to force the use of
+ * {@link #setApplication(String, String)} which need a version.
+ * This method is still here because it is used by
+ * {@link #parseJSDL(String)}
+ */
+ protected void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
public String getApplicationVersion() {
return applicationVersion;
}
+ /**
+ * this method is protected to force the use of
+ * {@link #setApplication(String, String)} which need a version.
+ * This method is still here because it is used by
+ * {@link #parseJSDL(String)}
+ */
+ protected void setApplicationVersion(String applicationVersion) {
+ this.applicationVersion = applicationVersion;
+ }
+
public void setJobName(String jobName) {
this.jobName = jobName;
}
public void setApplication(String applicationName,
String applicationVersion) {
- this.applicationName = applicationName;
- this.applicationVersion = applicationVersion;
+ setApplicationName(applicationName);
+ setApplicationVersion(applicationVersion);
}
@Override
@@ -132,38 +198,192 @@
protected static Map<String, JobDescription> map = new HashMap<String, JobDescription>();
public String toJSDL() {
+ /*
count += 1;
map.put(count.toString(), this);
return count.toString();
- }
+ */
+
+ String jsdl =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ + "<jsdl:JobDefinition xmlns=\"http://www.example.org/\"\n"
+ + " xmlns:jsdl=\"http://schemas.ggf.org/jsdl/2005/11/jsdl\"\n"
+ + " xmlns:jsdl-posix=\"http://schemas.ggf.org/jsdl/2005/11/jsdl-posix\"\n"
+ + " xmlns:diswork=\"http://nuiton.org/projects/show/diswork\"\n"
+ + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\">\n"
+ + "<jsdl:JobDescription>\n"
+ + " <jsdl:JobIdentification>\n"
+ + " <jsdl:JobName>" + jobName + "</jsdl:JobName>\n"
+ + " </jsdl:JobIdentification>\n"
+ + " <jsdl:Application>\n";
+ if (applicationName != null) {
+ jsdl += " <jsdl:ApplicationName>" + applicationName + "</jsdl:ApplicationName>\n"
+ + " <jsdl:ApplicationVersion>" + applicationVersion + "</jsdl:ApplicationVersion>\n";
+ }
+ jsdl += " <jsdl-posix:POSIXApplication>\n"
+ + " <jsdl-posix:Executable />\n"
+ + " <jsdl-posix:Argument>" + commandLine + "</jsdl-posix:Argument>\n";
+ if (standardInput != null) {
+ jsdl += " <jsdl-posix:Input>" + standardInput + "</jsdl-posix:Input>\n";
+ }
+ if (standardOutput != null) {
+ jsdl += " <jsdl-posix:Output>" + standardOutput + "</jsdl-posix:Output>\n";
+ }
+ jsdl += " </jsdl-posix:POSIXApplication>\n"
+ + " </jsdl:Application>\n";
+
+ for (String inputName : input) {
+ jsdl += " <jsdl:DataStaging diswork:type=\"in\">\n"
+ + " <jsdl:FileName>" + inputName + "</jsdl:FileName>\n";
+ if (inputUrls.containsKey(inputName)) {
+ jsdl +=
+ " <jsdl:Source>\n"
+ + " <jsdl:URI>" + inputUrls.get(inputName) + "</jsdl:URI>\n"
+ + " </jsdl:Source>\n";
+ }
+ jsdl += " </jsdl:DataStaging>\n";
+ }
- public static JobDescription parseJSDL(String jsdl) {
- return map.get(jsdl);
+ for (String outputName : output) {
+ jsdl += " <jsdl:DataStaging diswork:type=\"out\">\n"
+ + " <jsdl:FileName>" + outputName + "</jsdl:FileName>\n"
+ + " </jsdl:DataStaging>\n";
+ }
+
+ jsdl +=
+ "</jsdl:JobDescription>\n"
+ + "</jsdl:JobDefinition>\n";
+
+ return jsdl;
+
}
- public void addStagingInput(String fileName, URL source) {
- stagingInput.add(fileName);
- stagingInputUrls.put(fileName, source);
+ /**
+ * Factory method to get a JobDescription from JSDL
+ * @param jsdl the content of the JSDL file
+ * @return a job description representing the content of the JSDL
+ * @throws IOException if JSDL is malformed or if an URL is malformed
+ */
+ public static JobDescription parseJSDL(String jsdl) throws IOException {
+ // TODO 20100616 bleny correctly set dependency to JDOM in pom.xml
+ JobDescription result = new JobDescription();
+
+ try {
+ SAXBuilder builder = new SAXBuilder();
+ Document document = builder.build(IOUtils.toInputStream(jsdl));
+ Element jobDefinition = document.getRootElement();
+
+ // namespaces
+ Namespace jsdlNamespace = jobDefinition.getNamespace("jsdl");
+ Namespace jsdlPosixNamespace = jobDefinition.getNamespace("jsdl-posix");
+ Namespace disworkNamespace = jobDefinition.getNamespace("diswork");
+
+ // main element
+ Element jobDescription = jobDefinition.getChild("JobDescription",
+ jsdlNamespace);
+
+ // job identification
+ Element jobIdentification = jobDescription.getChild
+ ("JobIdentification", jsdlNamespace);
+ Element jobName = jobIdentification.getChild("JobName",
+ jsdlNamespace);
+ result.setJobName(jobName.getText());
+
+ // application
+ Element application = jobDescription.getChild("Application",
+ jsdlNamespace);
+ Element applicationName = application.getChild("ApplicationName",
+ jsdlNamespace);
+ if (application != null) {
+ Element applicationVersion = application.getChild
+ ("ApplicationVersion", jsdlNamespace);
+ result.setApplication(applicationName.getText(),
+ applicationVersion.getText());
+ }
+
+ Element POSIXApplication = application.getChild("POSIXApplication",
+ jsdlPosixNamespace);
+
+ Element argument = POSIXApplication.getChild("Argument",
+ jsdlPosixNamespace);
+ result.setCommandLine(argument.getText());
+
+ Element input = POSIXApplication.getChild("Input",
+ jsdlPosixNamespace);
+ if (input != null) {
+ result.setStandardInput(input.getText());
+ }
+ Element output = POSIXApplication.getChild("Output",
+ jsdlPosixNamespace);
+ if (input != null) {
+ result.setStandardOutput(output.getText());
+ }
+
+ // staging
+ List<Element> dataStagings = jobDescription.getChildren
+ ("DataStaging", jsdlNamespace);
+ for (Element dataStaging : dataStagings) {
+ Attribute type = dataStaging.getAttribute("type",
+ disworkNamespace);
+
+ Element fileName = dataStaging.getChild("FileName",
+ jsdlNamespace);
+
+ if (type != null && "out".equals(type.getValue())) {
+ // type="out"
+ result.addOutput(fileName.getText());
+ } else {
+ // type="in"
+ Element source = dataStaging.getChild("Source",
+ jsdlNamespace);
+ if (source != null) {
+ Element URI = source.getChild("URI", jsdlNamespace);
+ result.addInput(fileName.getText(), new URL(URI.getText()));
+ } else {
+ result.addInput(fileName.getText(), new NullInputStream(0));
+ }
+
+ // type not set
+ if (type == null) {
+ result.addOutput(fileName.getText());
+ }
+ }
+ }
+
+ } catch (JDOMException e) {
+ log.error("can't read malformed JSDL file", e);
+ throw new IOException("can't read malformed JSDL file", e);
+ } catch (MalformedURLException e) {
+ log.error("malformed URL", e);
+ throw new IOException("malformed URL", e);
+ }
+ return result;
}
- public void addStagingInput(String fileName) {
- stagingInput.add(fileName);
+ public void addInput(String fileName, URL source) {
+ input.add(fileName);
+ inputUrls.put(fileName, source);
}
+
+ public void addInput(String fileName, InputStream source) {
+ input.add(fileName);
+ inputData.put(fileName, source);
+ }
- public void addStagingOutput(String fileName) {
- stagingOutput.add(fileName);
+ public void addOutput(String fileName) {
+ output.add(fileName);
}
public List<String> getStagingInput() {
- return stagingInput;
+ return input;
}
public List<String> getStagingOutput() {
- return stagingOutput;
+ return output;
}
public Map<String, URL> getStagingInputUrls() {
- return stagingInputUrls;
+ return inputUrls;
}
public String getStandardInput() {
@@ -182,4 +402,8 @@
standardOutput = fileName;
}
+ public Map<String, InputStream> getInputData() {
+ return inputData;
+ }
+
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -32,16 +32,19 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.fs.DisworkFileSystem;
+import org.nuiton.util.FileUtil;
import org.nuiton.util.ZipUtil;
/**
@@ -57,14 +60,18 @@
/** time to wait beetween two look for a job */
protected static final int JOB_WAIT = 10 * 1000;
+ // TODO 20100614 bleny make it configurable
+ /** after this time (ms), a job is considered as no longer running */
+ protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000;
+
static {
RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING);
RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_1_RUNNING);
RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_2_RUNNING);
- RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1);
- RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2);
- RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3);
+ FAILED_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1);
+ FAILED_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2);
+ FAILED_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3);
}
private static final Log log = LogFactory.getLog(WorkersManager.class);
@@ -72,15 +79,17 @@
protected DisworkFileSystem fileSystem;
protected DisworkConfig config;
- // Pool of workers ?
- protected Worker worker;
+ // Pool of workers
+ protected List<Worker> workers = new ArrayList<Worker>();
- // Activity strategy ?
-
- protected class Worker implements Runnable {
+ protected ActivityStrategy activityStrategy;
+
+ protected class Worker extends Thread {
public boolean shouldStop = false;
+ public WorkersManager manager;
+
protected void log(String jobPath, String message) throws IOException {
String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH;
InputStream oldLogAsStream = fileSystem.read(logPath);
@@ -113,24 +122,32 @@
log.info("will run job " + jobDescription);
// create temp dir
- File jobDir = new File(config.getTempDirectory(), jobDescription.getJobId());
+ Random random = new Random();
+ File jobDir = new File(config.getTempDirectory(),
+ String.valueOf(random.nextInt()));
jobDir.mkdirs();
// download application
- String applicationPath = DisworkDaemon.getPathForDependency(
+ if (jobDescription.getApplicationName() != null) {
+ log.info("dependency needed for " + jobDescription + " (" +
+ jobDescription.getApplicationName() + "-" +
+ jobDescription.getApplicationVersion() + ")");
+ String applicationPath = DisworkDaemon.getPathForDependency(
jobDescription.getApplicationName(),
jobDescription.getApplicationVersion());
- InputStream applicationData = fileSystem.read(applicationPath);
-
-
- File application = new File(jobDir, FilenameUtils.getName(applicationPath));
- application.createNewFile();
- log.info("will create " + application.getAbsolutePath());
- OutputStream out = new FileOutputStream(application);
- IOUtils.copy(applicationData, out);
- // unzip application
- ZipUtil.uncompress(application, jobDir);
-
+ InputStream applicationData = fileSystem.read(applicationPath);
+
+ File application = new File(jobDir,
+ FilenameUtils.getName(applicationPath));
+ application.createNewFile();
+ log.info("will create " + application.getAbsolutePath());
+ OutputStream out = new FileOutputStream(application);
+ IOUtils.copy(applicationData, out);
+ // unzip application
+ ZipUtil.uncompress(application, jobDir);
+ } else {
+ log.info("no dependency specified for " + jobDescription);
+ }
// staging input files
for (String fileName : jobDescription.getStagingInput()) {
File localCopy = new File(jobDir, fileName);
@@ -151,16 +168,17 @@
// executing the job
String commandLine = jobDescription.getCommandLine();
log.info("calling " + commandLine);
- String[] bidule = commandLine.split(" ");
- ProcessBuilder builder = new ProcessBuilder(bidule);
+ String[] commandLineElements = commandLine.split(" ");
+ ProcessBuilder builder = new ProcessBuilder(commandLineElements);
builder.directory(jobDir);
builder.redirectErrorStream(true);
Process job = builder.start();
- // plugin a file on the standard input
+ // plugging a file on the standard input
String standardInputFileName = jobDescription.getStandardInput();
if (standardInputFileName != null) {
- InputStream input = new FileInputStream(new File(jobDir, standardInputFileName));
+ InputStream input = new FileInputStream(
+ new File(jobDir, standardInputFileName));
IOUtils.copy(input, job.getOutputStream());
}
@@ -177,7 +195,8 @@
// dump the standard output in a file
String standardOutputFileName = jobDescription.getStandardOutput();
if (standardOutputFileName != null) {
- OutputStream output = new FileOutputStream(new File(jobDir, standardOutputFileName));
+ OutputStream output = new FileOutputStream(
+ new File(jobDir, standardOutputFileName));
IOUtils.copy(job.getInputStream(), output);
}
@@ -201,78 +220,207 @@
}
// clean up the job directory
- // FileUtil.deleteRecursively(jobDir);
+ FileUtil.deleteRecursively(jobDir);
boolean success = exitValue == 0;
if (success) {
- log(jobPath, "DONE\nFINISHED");
+ log(jobPath, "DONE");
} else {
- log(jobPath, "FAILED\nFINISHED");
+ log(jobPath, "FAILED");
}
return success;
}
+
+ public String getFistJobName(String path) throws IOException {
+ List<String> jobsNames = fileSystem.readDirectory(path);
+ if (jobsNames.size() == 0) {
+ return null;
+ } else {
+ Collections.sort(jobsNames);
+ return jobsNames.get(0);
+ }
+ }
- @Override
- public void run() {
- while (! shouldStop) {
- // try to find a new job
- try {
- // TODO 20100609 bleny watch for other jobs
- List<String> jobsNames =
- fileSystem.readDirectory(DisworkDaemon.TODO);
- if (jobsNames.size() != 0) {
- // sort and choose the first, due to names, it should be
- // the more ancient one
- Collections.sort(jobsNames);
- String oldName = jobsNames.get(0);
-
- String newName = DisworkDaemon.newJobLinkName();
- fileSystem.move(DisworkDaemon.TODO + "/" + oldName,
- DisworkDaemon.TODO_RUNNING + "/" + newName);
-
- boolean jobSuccess = runJob(DisworkDaemon.TODO_RUNNING + "/" + newName);
-
- oldName = newName;
- newName = DisworkDaemon.newJobLinkName();
-
- if (jobSuccess) {
- fileSystem.move(
- DisworkDaemon.TODO_RUNNING + "/" + oldName,
- DisworkDaemon.DONE + "/" + newName);
- } else {
- fileSystem.move(
- DisworkDaemon.TODO_RUNNING + "/" + oldName,
- DisworkDaemon.FAILED_3 + "/" + newName);
+ protected void findAJobAndRunIt() {
+ // try to find a new job
+ try {
+ String jobLinkDir = null;
+ String jobLinkName = null;
+
+ String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
+ DisworkDaemon.FAILED_1_RUNNING,
+ DisworkDaemon.TODO_RUNNING
+ };
+ for (String path : runningJobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) {
+ Long linkAge = System.currentTimeMillis()
+ - Long.parseLong(oldName);
+ if (linkAge <= MAX_JOB_RUNNING_TIME) {
+ log.info("taking old job (age = " + linkAge + ")");
+ jobLinkDir = path;
+ jobLinkName = oldName;
}
+ }
+ }
+
+ if (jobLinkDir == null) {
+ String[] jobsDirs = { DisworkDaemon.FAILED_2,
+ DisworkDaemon.FAILED_1,
+ DisworkDaemon.TODO
+ };
+ for (String path : jobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) { // take it
+ jobLinkDir = path;
+ jobLinkName = oldName;
+ }
+ }
+ }
+
+ if (jobLinkDir == null) {
+ log.info("nothing to do");
+ Thread.sleep(JOB_WAIT);
+ } else {
+ // move the link before running the job
+ String oldPath = jobLinkDir + "/" + jobLinkName;
+ log.info("job found at " + oldPath);
+
+ String newPath = RUNNING_MOVE.get(jobLinkDir) + "/" +
+ DisworkDaemon.newJobLinkName();
+
+ log.info("moving " + oldPath + " to " + newPath);
+ fileSystem.move(oldPath, newPath);
+
+ // run the job
+ boolean jobSuccess = runJob(newPath);
+
+ // move the link after the job
+ oldPath = newPath;
+ String newDir = null;
+ if (jobSuccess) {
+ newDir = DisworkDaemon.DONE;
} else {
- log.info("nothing to do");
- Thread.sleep(JOB_WAIT);
+ newDir = FAILED_MOVE.get(jobLinkDir);
}
+ newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
- } catch (IOException e) {
- log.error("error while reading jobs", e);
- // TODO 20100611 bleny manage exception
+ log.info("moving " + oldPath + " to " + newPath);
+ fileSystem.move(oldPath, newPath);
+
+ // mark the job has finished if done or failed too many
+ // times
+ if ( newDir == DisworkDaemon.DONE ||
+ newDir == DisworkDaemon.FAILED_3) {
+ log.info("marking " + newPath + " as finished");
+ log(newPath, "FINISHED");
+ }
+ }
+
+
+ } catch (IOException e) {
+ log.error("error while reading jobs", e);
+ // TODO 20100611 bleny manage exception
+ } catch (InterruptedException e) {
+ log.info("exception catch", e);
+ // TODO 20100611 bleny manage exception
+ }
+
+ }
+
+ @Override
+ public void run() {
+ while (! shouldStop) {
+ if (manager.getActivityStrategy().canWork()) {
+ findAJobAndRunIt();
+ }
+ try {
+ Thread.sleep(10*1000);
} catch (InterruptedException e) {
+ // TODO 20100615 bleny Auto-generated catch block
log.info("exception catch", e);
- // TODO 20100611 bleny manage exception
+ e.printStackTrace();
}
}
}
-
}
public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) {
this.fileSystem = fileSystem;
this.config = config;
- }
- public void start() {
- worker = new Worker();
- Thread t = new Thread(worker);
- t.start();
+ log.info("will start " + config.getNumberOfWorkers() + " workers");
+ for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) {
+ Worker worker = new Worker();
+ worker.manager = this;
+ worker.start();
+ workers.add(worker);
+ }
+
+ String initialStrategy = config.getActivityStrategy();
+ if ( "none".equals(initialStrategy)) {
+ activeNoActivityStrategy();
+ } else if ("unlimited".equals(initialStrategy)) {
+ activeUnlimitedActivityStrategy();
+ } else if ("limited".equals(initialStrategy)) {
+ activeLimitedActivityStrategy();
+ } else if ("scheduled".equals(initialStrategy)) {
+ activeScheduledActivityStrategy();
+ } else {
+ log.error("wrong config directive " + initialStrategy);
+ activeNoActivityStrategy();
+ }
+
}
public void stop() {
- worker.shouldStop = true;
+ stop(false);
}
+
+ public void stop(boolean now) {
+ // asking to all threads to stop
+ for (Worker worker : workers) {
+ worker.shouldStop = true;
+ }
+
+ if( !now ) {
+ // waiting for them to actually have finished
+ for (Worker worker : workers) {
+ while (worker.isAlive()) {
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ // TODO 20100615 bleny Auto-generated catch block
+ log.info("exception catch", e);
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ public ActivityStrategy getActivityStrategy() {
+ return activityStrategy;
+ }
+
+ public void setActivityStrategy(ActivityStrategy activityStrategy) {
+ this.activityStrategy = activityStrategy;
+ }
+
+ public void activeNoActivityStrategy() {
+ activityStrategy = new ActivityStrategy.NoActivity();
+ }
+
+ public void activeUnlimitedActivityStrategy() {
+ activityStrategy = new ActivityStrategy.UnlimitedActivity();
+ }
+
+ public void activeLimitedActivityStrategy() {
+ activityStrategy = new ActivityStrategy.LimitedActivity();
+ }
+
+ public void activeScheduledActivityStrategy() {
+ activityStrategy = new ActivityStrategy.ScheduledActivity();
+ }
+
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-14 08:53:35 UTC (rev 75)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -5,7 +5,6 @@
import java.io.InputStream;
import java.net.URL;
-import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.IOUtils;
@@ -15,9 +14,9 @@
public class DisworkDaemonTest {
- protected DisworkDaemon daemon;
+ protected static DisworkDaemon daemon;
- protected static int port = 39999;
+ protected static int port = 49999;
@Before
public void setUp() throws Exception {
@@ -31,19 +30,27 @@
@After
public void tearDown() throws Exception {
+ Thread.sleep(5000);
daemon.close();
}
+ @Test
+ public void simpleSubmit() throws Exception {
+ JobDescription job = new JobDescription();
+ job.setCommandLine("java -version");
+ daemon.submitJob(job);
+ }
+
@Test(expected = DisworkException.class)
public void testSubmitWithoutDependency() throws Exception {
- JobDescription job = daemon.newJob();
+ JobDescription job = new JobDescription();
job.setApplication("non-existing-application", "0.0");
daemon.submitJob(job);
}
@Test
public void testSubmitSuccessfulJob() throws Exception {
- JobDescription job = daemon.newJob();
+ JobDescription job = new JobDescription();
job.setApplication("fake-app", "1.0");
job.setCommandLine("%java -jar fake-app.jar");
daemon.submitJob(job);
@@ -53,11 +60,12 @@
}
assertTrue(daemon.isSuccessful(job));
+
}
@Test
public void testSubmitFailJob() throws Exception {
- JobDescription job = daemon.newJob();
+ JobDescription job = new JobDescription();
job.setApplication("fake-app", "1.0");
job.setCommandLine("%java -jar fake-app.jar fail");
daemon.submitJob(job);
@@ -69,38 +77,73 @@
assertTrue(daemon.isFailed(job));
}
+ /**
+ * Create a complex job, submit it and check the results:
+ * <ul>
+ * <li>the job need an application (fake-app version 1.0);</li>
+ * <li>the job come with input data (input.txt);</li>
+ * <li>the job need another input file to be downloaded from http;</li>
+ * <li>the job ask for a file to be provided as a result at the end
+ * of the job.</li>
+ * <li>standard output is asked has result</li>
+ * </ul>
+ *
+ */
@Test
public void testStaging() throws Exception {
- JobDescription job = daemon.newJob();
+ JobDescription job = new JobDescription();
job.setJobName("My Job");
job.setApplication("fake-app", "1.0");
job.setCommandLine("%java -jar fake-app.jar");
- job.addStagingInput("example.com_index", new URL("http://www.example.com/"));
- job.addStagingInput("input.txt");
- job.addStagingOutput("output.txt");
+ // defining data in input
+ job.addInput("example.com_index", new URL("http://www.example.com/"));
+ job.addInput("input.txt", ClassLoader.getSystemResourceAsStream("input.txt"));
+ // defining expected data in output
+ job.addOutput("output.txt");
+ job.addOutput("example.com_index");
+
+ // setting standard input and output file
job.setStandardInput("input.txt");
- job.setStandardOutput("output.txt");
-
- Map<String, InputStream> data = new HashMap<String, InputStream>();
- data.put("input.txt", ClassLoader.getSystemResourceAsStream("input.txt"));
-
- daemon.submitJob(job, data);
+ job.setStandardOutput("output.txt");
+
+ // submit the job
+ daemon.submitJob(job);
+ // waiting for the job to finish
while(! daemon.isFinished(job)) {
- Thread.sleep(1 * 1000);
+ Thread.sleep(5 * 1000);
}
+ // check that job is successful
assertTrue(daemon.isSuccessful(job));
+ // checking the presence of results
Map<String, InputStream> results = daemon.getResults(job);
-
+ assertEquals(2, results.size());
assertTrue(results.containsKey("output.txt"));
+ assertTrue(results.containsKey("example.com_index"));
+ // checking that results are what was expected
String output = IOUtils.toString(results.get("output.txt"));
assertEquals("a print on standard output\n", output);
-
+ output = IOUtils.toString(results.get("example.com_index"));
+ assertTrue(output.contains("Example Web Page"));
}
+ /**
+ * tests the stats given by the daemon
+ * @throws Exception
+ */
+ @Test
+ public void testStats() throws Exception {
+ daemon.getUptimeRatio();
+
+ Map<String, Integer> stats = daemon.getGlobalStats();
+ // deamon should read 3 stats : 1 OS, 1 architecture and the number
+ // of processors
+ assertEquals(3, stats);
+ }
+
}
Added: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java (rev 0)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-06-16 15:55:53 UTC (rev 76)
@@ -0,0 +1,98 @@
+package org.nuiton.diswork.daemon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.io.IOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests in this class create a complex JobDescription and check that
+ * transformation to JSDL and back to an object keep all informations
+ *
+ * @author bleny
+ */
+public class JobDescriptionTest {
+
+ protected static JobDescription job;
+ protected static JobDescription job2;
+
+ @Before
+ public void setUp() throws Exception {
+ job = new JobDescription();
+ job.setJobName("My Job");
+ job.setApplication("fake-app", "1.0");
+ job.setCommandLine("%java -jar fake-app.jar");
+
+ job2 = new JobDescription();
+ job2.setJobName("My Job");
+ job2.setApplication("fake-app", "1.0");
+ job2.setCommandLine("%java -jar fake-app.jar");
+
+
+ // defining data in input
+ job2.addInput("example.com_index", new URL("http://www.example.com/"));
+ job2.addInput("input.txt", IOUtils.toInputStream(""));
+
+ // defining expected data in output
+ job2.addOutput("output.txt");
+ job2.addOutput("example.com_index");
+
+ // setting standard input and output file
+ job2.setStandardInput("input.txt");
+ job2.setStandardOutput("output.txt");
+
+ }
+
+ @Test
+ public void testParseJSDL1() throws Exception {
+ try {
+ JobDescription jobCopy = JobDescription.parseJSDL(job.toJSDL());
+ assertNotNull(jobCopy);
+ assertEquals(job.getJobName(), jobCopy.getJobName());
+ assertEquals(job.getApplicationName(),
+ jobCopy.getApplicationName());
+ assertEquals(job.getApplicationVersion(),
+ jobCopy.getApplicationVersion());
+ assertEquals(job.getCommandLine(), jobCopy.getCommandLine());
+ assertEquals(job.getStandardInput(), jobCopy.getStandardInput());
+ assertEquals(job.getStandardOutput(), jobCopy.getStandardOutput());
+ } catch (IOException e) {
+ fail();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testParseJSDL2() throws Exception {
+ try {
+ JobDescription job2Copy = JobDescription.parseJSDL(job2.toJSDL());
+ assertEquals(job2.getCommandLine(), job2Copy.getCommandLine());
+ assertEquals(job2.getStandardInput(), job2Copy.getStandardInput());
+ assertEquals(job2.getStandardOutput(),
+ job2Copy.getStandardOutput());
+
+ assertTrue(ListUtils.isEqualList(job2.getStagingInput(),
+ job2Copy.getStagingInput()));
+ assertTrue(ListUtils.isEqualList(job2.getStagingOutput(),
+ job2Copy.getStagingOutput()));
+ assertTrue(ListUtils.isEqualList(
+ job2.getStagingInputUrls().keySet(),
+ job2Copy.getStagingInputUrls().keySet()));
+ assertTrue(ListUtils.isEqualList(
+ job2.getStagingInputUrls().values(),
+ job2Copy.getStagingInputUrls().values()));
+ } catch (IOException e) {
+ fail();
+ throw e;
+ }
+ }
+
+}
1
0
14 Jun '10
Author: bleny
Date: 2010-06-14 10:53:35 +0200 (Mon, 14 Jun 2010)
New Revision: 75
Url: http://nuiton.org/repositories/revision/diswork/75
Log:
mauvaise utilisation du FS dans les tests
Modified:
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-14 08:12:34 UTC (rev 74)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-14 08:53:35 UTC (rev 75)
@@ -83,7 +83,7 @@
*/
@Test
public void testWrite() throws Exception {
- fileSystem.write("/", "my_file", new FileInputStream(randomFilePath));
+ fileSystem.write("/my_file", new FileInputStream(randomFilePath));
}
/**
@@ -92,7 +92,7 @@
*/
@Test
public void testExists() throws Exception {
- fileSystem.write("/", "my_file", new FileInputStream(randomFilePath));
+ fileSystem.write("/my_file", new FileInputStream(randomFilePath));
assertTrue(fileSystem.exists("/my_file"));
assertFalse(fileSystem.exists("/my_other_file"));
}
@@ -119,7 +119,7 @@
InputStream source;
source = new ByteArrayInputStream(bytes);
- fileSystem.write("/", "my_file", source);
+ fileSystem.write("/my_file", source);
source.close();
@@ -148,7 +148,7 @@
InputStream source = new FileInputStream(randomFilePath);
- fileSystem.write("/", "my_file", source);
+ fileSystem.write("/my_file", source);
source.close();
@@ -179,7 +179,7 @@
*/
@Test(expected = IOException.class)
public void testFailAtWrite() throws Exception {
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
}
@@ -220,10 +220,10 @@
@Test
public void testWriteInFolder() throws Exception {
fileSystem.createDirectory("/my_folder");
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
fileSystem.createDirectory("/my_folder/my_sub_folder");
- fileSystem.write("/my_folder/my_sub_folder", "my_file",
+ fileSystem.write("/my_folder/my_sub_folder/my_file",
new FileInputStream(randomFilePath));
assertTrue(fileSystem.exists("/my_folder/my_sub_folder/my_file"));
}
@@ -236,7 +236,7 @@
@Test
public void testLinking() throws Exception {
fileSystem.createDirectory("/my_folder");
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
fileSystem.createSymbolicLink("/my_link", "/my_folder/my_file");
@@ -267,7 +267,7 @@
@Test
public void testRemove() throws Exception {
fileSystem.createDirectory("/my_folder");
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
fileSystem.delete("/my_folder/my_file");
assertTrue(fileSystem.exists("/my_folder"));
@@ -284,7 +284,7 @@
@Test(expected = IOException.class)
public void testFailAtRemove() throws Exception {
fileSystem.createDirectory("/my_folder");
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
// trying to remove a non-empty directory should raise an exception
@@ -298,7 +298,7 @@
public void testListDirectory() throws Exception {
fileSystem.createDirectory("/my_folder");
fileSystem.createDirectory("/my_folder/my_sub_dir");
- fileSystem.write("/my_folder", "my_file",
+ fileSystem.write("/my_folder/my_file",
new FileInputStream(randomFilePath));
fileSystem.createSymbolicLink("/my_folder/my_link", "my_file");
@@ -397,7 +397,11 @@
@Test
public void testCreateDirectories() throws Exception {
- fileSystem.createDirectories("/dir/subdir/subsubdir");
- assertTrue(fileSystem.exists("/dir/subdir/subsubdir"));
+ try {
+ fileSystem.createDirectories("/dir/subdir/subsubdir");
+ assertTrue(fileSystem.exists("/dir/subdir/subsubdir"));
+ } catch (IOException e) {
+ fail();
+ }
}
}
1
0
r74 - in trunk/diswork-fs/src: main/java/org/nuiton/diswork/fs/storage test/java/org/nuiton/diswork/fs
by bleny@users.nuiton.org 14 Jun '10
by bleny@users.nuiton.org 14 Jun '10
14 Jun '10
Author: bleny
Date: 2010-06-14 10:12:34 +0200 (Mon, 14 Jun 2010)
New Revision: 74
Url: http://nuiton.org/repositories/revision/diswork/74
Log:
correction fuite m?\195?\169moire avec test
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-06-11 15:52:09 UTC (rev 73)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-06-14 08:12:34 UTC (rev 74)
@@ -249,17 +249,17 @@
public void removeDirectory(String id) throws IOException {
log.debug("removeDirectory(\"" + id + "\")");
- removeKey(id);
+ remove(id);
}
public void removeFile(String id) throws IOException {
log.debug("removeFile(\"" + id + "\")");
- removeKey(id);
+ remove(id);
}
public void removeLink(String id) {
log.debug("removeLink(\"" + id + "\")");
- map.remove(id);
+ removeKey(id);
}
@@ -465,7 +465,11 @@
}
public void setMap(DisworkMap map) {
- this.map = map;
+ this.map = map;
}
+
+ public DisworkMap getMap() {
+ return map;
+ }
}
\ No newline at end of file
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-11 15:52:09 UTC (rev 73)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-14 08:12:34 UTC (rev 74)
@@ -43,7 +43,7 @@
/**
* The file will have this fixed size
*/
- static protected int randomFileSize = 9999;
+ static protected int randomFileSize = 25 * 1000;
static protected DisworkFileSystem fileSystem;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-06-11 15:52:09 UTC (rev 73)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-06-14 08:12:34 UTC (rev 74)
@@ -1,6 +1,11 @@
package org.nuiton.diswork.fs;
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileInputStream;
+
import org.junit.Before;
+import org.junit.Test;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
@@ -18,4 +23,22 @@
fileSystem = new DisworkFileSystem(disworkConfig);
}
+ @Test
+ public void testMemoryLeak() throws Exception {
+ int initialSize = fileSystem.storage.getMap().size();
+
+ fileSystem.createDirectory("/dir");
+ fileSystem.write("/dir/file", new FileInputStream(randomFilePath));
+ fileSystem.createSymbolicLink("/dir/link", "/dir/file");
+
+ fileSystem.delete("/dir/link");
+ fileSystem.delete("/dir/file");
+ fileSystem.delete("/dir");
+
+ fileSystem.storage.clean();
+
+ int finalSize = fileSystem.storage.getMap().size();
+ assertEquals(initialSize, finalSize);
+ }
+
}
1
0
Author: bleny
Date: 2010-06-11 17:52:09 +0200 (Fri, 11 Jun 2010)
New Revision: 73
Url: http://nuiton.org/repositories/revision/diswork/73
Log:
impl?\195?\169mentation du d?\195?\169mon diswork ; ajout d'op?\195?\169rations dans diswork-fs
Added:
trunk/diswork-daemon/src/main/java/org/
trunk/diswork-daemon/src/main/java/org/nuiton/
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkFileSystemException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/
trunk/diswork-daemon/src/test/java/org/nuiton/
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
trunk/diswork-daemon/src/test/resources/
trunk/diswork-daemon/src/test/resources/fake-app-1.0.zip
trunk/diswork-daemon/src/test/resources/fake-app.jar
trunk/diswork-daemon/src/test/resources/input.txt
trunk/diswork-daemon/src/test/resources/log4j.properties
trunk/diswork-fs/src/test/resources/
trunk/diswork-fs/src/test/resources/log4j.properties
Removed:
trunk/diswork-fs/src/main/resources/log4j.properties
Modified:
trunk/diswork-daemon/pom.xml
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMap.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
Modified: trunk/diswork-daemon/pom.xml
===================================================================
--- trunk/diswork-daemon/pom.xml 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-daemon/pom.xml 2010-06-11 15:52:09 UTC (rev 73)
@@ -31,6 +31,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-utils</artifactId>
</dependency>
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,94 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import org.nuiton.diswork.fs.DisworkFileSystemConfig;
+import org.nuiton.util.ApplicationConfig;
+
+public class DisworkConfig extends ApplicationConfig {
+
+ protected DisworkFileSystemConfig fileSystemConfig;
+
+ public DisworkConfig() {
+ setConfigFileName("diswork.config");
+ }
+
+ public String getTempDirectory() {
+ return System.getProperty("java.io.tmpdir",
+ System.getProperty("user.dir",
+ ".")) + "/diswork";
+ }
+
+ public static DisworkConfig newConfig() {
+ DisworkConfig newConfig = new DisworkConfig();
+ newConfig.setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
+ return newConfig;
+ }
+
+ public String getOwnerId() {
+ return getOption("diswork.owner");
+ }
+
+ public void setOwnerId(String ownerId) {
+ setOption("diswork.owner", ownerId);
+ }
+
+
+
+
+
+ public String getBootstrapIp() {
+ return fileSystemConfig.getBootstrapIp();
+ }
+
+ public Integer getBootstrapPort() {
+ return fileSystemConfig.getBootstrapPort();
+ }
+
+ public Integer getUsedPort() {
+ return fileSystemConfig.getUsedPort();
+ }
+
+ public void setBootstrapIp(String ip) {
+ fileSystemConfig.setBootstrapIp(ip);
+ }
+
+ public void setBootstrapPort(Integer port) {
+ fileSystemConfig.setBootstrapPort(port);
+ }
+
+ public void setUsedPort(Integer port) {
+ fileSystemConfig.setUsedPort(port);
+ }
+
+ public DisworkFileSystemConfig getFileSystemConfig() {
+ return fileSystemConfig;
+ }
+
+ public void setFileSystemConfig(DisworkFileSystemConfig fileSystemConfig) {
+ this.fileSystemConfig = fileSystemConfig;
+ }
+
+}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,372 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.UnknownHostException;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.nuiton.diswork.fs.DisworkFileSystem;
+import org.nuiton.diswork.fs.DisworkFileSystemConfig;
+
+/**
+ *
+ * As far as possible, the use of the file system follow the UNIX Filesystem
+ * Hierarchy Standard
+ *
+ * @see http://en.wikipedia.org/wiki/Filesystem_Hierarchy_Standard
+ *
+ * @author bleny
+ */
+public class DisworkDaemon implements Closeable {
+
+ private static final Log log = LogFactory.getLog(DisworkDaemon.class);
+
+ protected DisworkFileSystem fileSystem;
+
+ protected DisworkConfig config;
+
+ /** owner id for this node */
+ protected String ownerId;
+
+ /** path to owned directory on fileSystem */
+ protected String homeDir;
+
+ /** contains applications */
+ protected static final String BIN = "/bin";
+
+ /** new jobs */
+ protected static final String TODO = "/var/jobs/todo";
+
+ /** jobs taken that was never tried */
+ protected static final String TODO_RUNNING = "/var/jobs/todo_running";
+
+ /** jobs failed once */
+ protected static final String FAILED_1 = "/var/jobs/failed_1";
+
+ /** jobs taken that was failed once */
+ protected static final String FAILED_1_RUNNING = "/var/jobs/failed_1_running";
+
+ /** jobs failed two times */
+ protected static final String FAILED_2 = "/var/jobs/failed_2";
+
+ /** jobs taken that was failed two times */
+ protected static final String FAILED_2_RUNNING = "/var/jobs/failed_2_running";
+
+ /** jobs failed 3 times */
+ protected static final String FAILED_3 = "/var/jobs/failed_3";
+
+ /** jobs done */
+ protected static final String DONE = "/var/jobs/done";
+
+ /** a place where are all user-directories */
+ protected static final String HOME = "/home";
+
+ /** worker-manager will make the daemon accomplish jobs */
+ protected WorkersManager workers;
+
+ /** in a job directory, the place where the JSDL must be placed */
+ protected static final String JSDL_PATH = ".diswork/job.jsdl";
+
+ /** in a job directory, the place where the log must be placed */
+ protected static final String LOG_PATH = ".diswork/job.log";
+
+ public DisworkDaemon(DisworkConfig config) throws DisworkException {
+ this.config = config;
+
+ // init fileSystem with all needed directories
+ try {
+ DisworkFileSystemConfig fileSystemConfig =
+ config.getFileSystemConfig();
+
+ fileSystem = new DisworkFileSystem(fileSystemConfig);
+ initFileSystem();
+ } catch (UnknownHostException e) {
+ log.error("bootstrap failed", e);
+ throw new DisworkException("bootstrap failed", e);
+ } catch (IOException e) {
+ log.error("booting diswork file system failed", e);
+ throw new DisworkException("booting diswork file system failed", e);
+ }
+
+
+ ownerId = config.getOwnerId(); // get job owner id from config
+
+ if (ownerId == null) {
+ log.info("can't find owner id, generating a new one");
+ // generate a new one by cheking if home dir exists
+ ownerId = System.getProperty("user.name", "anonymous");
+ // check home dir do not exists
+ config.setOwnerId(ownerId);
+
+ // config.saveForUser();
+ }
+
+ log.info("owner id is " + ownerId);
+
+ try {
+ homeDir = HOME + "/" + ownerId;
+ if (!fileSystem.exists(homeDir)) {
+ fileSystem.createDirectory(homeDir);
+ }
+ } catch (ConcurrentModificationException e) {
+ log.info("can't create home dir", e);
+ throw new DisworkException("can't create home dir", e);
+ } catch (IOException e) {
+ log.info("can't create home dir", e);
+ throw new DisworkException("can't create home dir", e);
+ }
+
+ // check if config implies to run a worker
+ workers = new WorkersManager(fileSystem, config);
+ workers.start();
+ }
+
+ protected void initFileSystem() throws ConcurrentModificationException,
+ IOException {
+ String[] directories = { TODO, TODO_RUNNING, FAILED_1, FAILED_1_RUNNING,
+ FAILED_2, FAILED_2_RUNNING, FAILED_3, DONE, HOME, BIN };
+ // if HOME exists, we suppose all others exists
+ if (! fileSystem.exists(HOME)) {
+ for (String directory : directories) {
+ if (! fileSystem.exists(directory)) {
+ fileSystem.createDirectories(directory);
+ log.info("created " + directory);
+ }
+ }
+ }
+ }
+
+ /**
+ * given a name and a version for an application, returns the path where
+ * application data can be found on diswork file system.
+ * @param applicationName
+ * @param applicationVersion
+ * @return a path
+ */
+ protected static String getPathForDependency(String applicationName,
+ String applicationVersion) {
+
+ String result = "/bin/" + applicationName // application directory
+ + "/"
+ // application file name
+ + applicationName + "-" + applicationVersion + ".zip";
+ return result;
+ }
+
+ /**
+ * every-time a link to a job is created or modified in the job, his name
+ * has to be generated by this method
+ * @return the name to use for a link
+ */
+ protected static String newJobLinkName() {
+ return ((Long) System.currentTimeMillis()).toString();
+ }
+
+ public void submitApplication(String applicationName,
+ String applicationVersion,
+ InputStream applicationData) throws DisworkException {
+
+ // the place where dependency will be stored
+ String path = getPathForDependency(applicationName, applicationVersion);
+
+ String applicationDirectory = FilenameUtils.getFullPathNoEndSeparator(path);
+
+ try {
+ if (!fileSystem.exists(applicationDirectory)) {
+ fileSystem.createDirectory(applicationDirectory);
+ }
+
+ if (!fileSystem.exists(path)) {
+ fileSystem.write(path, applicationData);
+ }
+ } catch (ConcurrentModificationException e) {
+ log.info("unable to write", e);
+ throw new DisworkException("unable to write", e);
+ } catch (IOException e) {
+ log.info("unable to write", e);
+ throw new DisworkException("unable to write", e);
+ }
+ }
+
+ /**
+ * Given a job description, returns the place on disworkFS where all data
+ * for this jobs should be stored
+ * @param jobDescription
+ * @return a path
+ */
+ protected String getJobPath(JobDescription jobDescription) {
+ return getJobPath(jobDescription.getJobId());
+ }
+
+ /**
+ * Given a job description, returns the place on disworkFS where all data
+ * for this jobs should be stored
+ * @param jobDescription
+ * @return a path
+ */
+ protected String getJobPath(String jobId) {
+ // all jobs are stored in home dir
+ return homeDir + "/" + jobId;
+ }
+
+ /**
+ *
+ */
+ public JobDescription newJob() throws IOException {
+ Random random = new Random();
+
+ boolean alreadyExists = true;
+ String newJobIntendifier = null;
+ while (alreadyExists) {
+ Integer randomInteger = random.nextInt();
+ newJobIntendifier = "job_" + randomInteger.toString();
+ alreadyExists = fileSystem.exists(getJobPath(newJobIntendifier));
+ }
+
+ // create both job path and sub-directory .diswork
+ fileSystem.createDirectories(getJobPath(newJobIntendifier) + "/" + ".diswork");
+ log.info("created new job " + newJobIntendifier);
+ return new JobDescription(newJobIntendifier);
+ }
+
+ public void submitJob(JobDescription jobDescription) throws DisworkException {
+ submitJob(jobDescription, new HashMap<String, InputStream>());
+ }
+
+ public void submitJob(JobDescription jobDescription,
+ Map<String, InputStream> inputFiles) throws DisworkException {
+
+ // check dependencies, throw exception
+
+ if (inputFiles.size() + jobDescription.getStagingInputUrls().size()
+ < jobDescription.getStagingInput().size()) {
+ // dependencies are missing
+ }
+
+ try {
+ String dependencyPath =
+ getPathForDependency(jobDescription.getApplicationName(),
+ jobDescription.getApplicationVersion());
+ log.info("looking for " + dependencyPath);
+
+ if (!fileSystem.exists(dependencyPath)) {
+ throw new DisworkException("job require a dependency " +
+ jobDescription.getApplicationName() + "-" +
+ jobDescription.getApplicationVersion() + " that is " +
+ "not available");
+ }
+
+ String jobDir = getJobPath(jobDescription);
+
+ if(!fileSystem.exists(jobDir)) {
+ // strange !
+ }
+
+ fileSystem.write(jobDir + "/" + LOG_PATH, IOUtils.toInputStream(""));
+
+ InputStream jobJSDL = IOUtils.toInputStream(jobDescription.toJSDL());
+ fileSystem.write(jobDir + "/" + JSDL_PATH, jobJSDL);
+
+ // file staging
+ for (String fileName : inputFiles.keySet()) {
+ fileSystem.write(jobDir + "/" + fileName, inputFiles.get(fileName));
+ }
+
+ // propose job
+ String linkName = newJobLinkName();
+
+ // FIXME 20100609 bleny may throws exception if jobs are proposed
+ // at a same time
+ fileSystem.createSymbolicLink(TODO + "/" + linkName, jobDir);
+
+ } catch (IOException e) {
+ log.error("file system error", e);
+ throw new DisworkFileSystemException("file system error", e);
+ }
+ }
+
+ public boolean checkLogContains(JobDescription job,
+ String pattern) throws DisworkException {
+ try {
+ String jobPath = getJobPath(job);
+ List<?> entries = IOUtils.readLines(fileSystem.read(jobPath + "/" + LOG_PATH));
+ return entries.contains(pattern);
+ } catch (FileNotFoundException e) {
+ log.info("log file was not found in job " + job, e);
+ throw new DisworkException("log file was not found in job " + job, e);
+ } catch (IOException e) {
+ log.info("file system error ", e);
+ throw new DisworkException("file system error ", e);
+ }
+ }
+
+ public boolean isFinished(JobDescription job) throws DisworkException {
+ return checkLogContains(job, "FINISHED");
+ }
+
+ public boolean isSuccessful(JobDescription job) throws DisworkException {
+ return checkLogContains(job, "DONE");
+ }
+
+ public boolean isFailed(JobDescription job) throws DisworkException {
+ return isFinished(job) && !isSuccessful(job);
+ }
+
+ @Override
+ public void close() throws IOException {
+ workers.stop();
+ fileSystem.close();
+ }
+
+ public Map<String, InputStream> getResults(JobDescription job)
+ throws DisworkException {
+ Map<String, InputStream> results = new HashMap<String, InputStream>();
+ for (String fileName : job.getStagingOutput()) {
+ String jobPath = getJobPath(job);
+ try {
+ InputStream result = fileSystem.read(jobPath + "/" + fileName);
+ results.put(fileName, result);
+ } catch (FileNotFoundException e) {
+ throw new DisworkException("an expected file is missing", e);
+ } catch (IOException e) {
+ log.info("file system error ", e);
+ throw new DisworkException("file system error ", e);
+ }
+ }
+ return results;
+ }
+}
\ No newline at end of file
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,48 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ * @author bleny
+ */
+public class DisworkDaemonRunner {
+
+ private static final Log log = LogFactory.getLog(DisworkDaemonRunner.class);
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ // consider args
+
+ // DisworkDaemon node = new DisworkDaemon(config);
+
+ }
+}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,47 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+/**
+ *
+ * @author bleny
+ */
+public class DisworkException extends Exception {
+
+ private static final long serialVersionUID = -6434751198109021511L;
+
+ public DisworkException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisworkException(String message) {
+ super(message);
+ }
+
+ public DisworkException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkFileSystemException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkFileSystemException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkFileSystemException.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,47 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+/**
+ *
+ * @author bleny
+ */
+public class DisworkFileSystemException extends DisworkException {
+
+ private static final long serialVersionUID = -4027003687525235092L;
+
+ public DisworkFileSystemException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisworkFileSystemException(String message) {
+ super(message);
+ }
+
+ public DisworkFileSystemException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,185 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ *
+ *
+ * @author bleny
+ */
+public class JobDescription implements Serializable {
+
+ private static final long serialVersionUID = -8493700934802808925L;
+
+ /** an id for diswork */
+ protected String jobId;
+
+ protected String jobName;
+
+ protected String applicationName;
+ protected String applicationVersion;
+
+ protected String commandLine;
+
+ /** all files expected at the beginning of the job */
+ protected List<String> stagingInput = new ArrayList<String>();
+
+ /** all files expected at the end of the job */
+ protected List<String> stagingOutput = new ArrayList<String>();
+
+ /** the name of a file and the URI where to get it */
+ protected Map<String, URL> stagingInputUrls = new HashMap<String, URL>();
+
+ /** file where to read the standard input, may be null */
+ protected String standardInput;
+
+ /** file where to write the standard output */
+ protected String standardOutput;
+
+ /** TOKENS are piece of string you can use for writing command lines */
+ protected static Map<String, String> TOKENS;
+
+ static {
+ TOKENS = new HashMap<String, String>();
+
+ TOKENS.put("%java", System.getProperty("java.home") + "/bin/java");
+ }
+
+ /**
+ * constructor is protected to prevent bad jobIds. To get a JobDescription
+ * instance, a client should use the {@link DisworkDaemon#newJob()}
+ * factory method. The given instance will have a valid jobId
+ * @param jobId
+ */
+ protected JobDescription(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getCommandLine() {
+ String result = commandLine;
+ for (String token : TOKENS.keySet()) {
+ result = result.replace(token, TOKENS.get(token));
+ }
+ return result;
+ }
+
+ public void setCommandLine(String commandLine) {
+ this.commandLine = commandLine;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public String getApplicationVersion() {
+ return applicationVersion;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public void setApplication(String applicationName,
+ String applicationVersion) {
+ this.applicationName = applicationName;
+ this.applicationVersion = applicationVersion;
+ }
+
+ @Override
+ public String toString() {
+ return "job : " + jobName + " (" + jobId + ")";
+ }
+
+ protected static Integer count = 0;
+ protected static Map<String, JobDescription> map = new HashMap<String, JobDescription>();
+
+ public String toJSDL() {
+ count += 1;
+ map.put(count.toString(), this);
+ return count.toString();
+ }
+
+ public static JobDescription parseJSDL(String jsdl) {
+ return map.get(jsdl);
+ }
+
+ public void addStagingInput(String fileName, URL source) {
+ stagingInput.add(fileName);
+ stagingInputUrls.put(fileName, source);
+ }
+
+ public void addStagingInput(String fileName) {
+ stagingInput.add(fileName);
+ }
+
+ public void addStagingOutput(String fileName) {
+ stagingOutput.add(fileName);
+ }
+
+ public List<String> getStagingInput() {
+ return stagingInput;
+ }
+
+ public List<String> getStagingOutput() {
+ return stagingOutput;
+ }
+
+ public Map<String, URL> getStagingInputUrls() {
+ return stagingInputUrls;
+ }
+
+ public String getStandardInput() {
+ return standardInput;
+ }
+
+ public void setStandardInput(String fileName) {
+ standardInput = fileName;
+ }
+
+ public String getStandardOutput() {
+ return standardOutput;
+ }
+
+ public void setStandardOutput(String fileName) {
+ standardOutput = fileName;
+ }
+
+}
\ No newline at end of file
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,278 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
+package org.nuiton.diswork.daemon;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.nuiton.diswork.fs.DisworkFileSystem;
+import org.nuiton.util.ZipUtil;
+
+/**
+ *
+ * @author bleny
+ */
+public class WorkersManager {
+
+ protected static Map<String, String> RUNNING_MOVE = new HashMap<String, String>();
+ protected static Map<String, String> FAILED_MOVE = new HashMap<String, String>();
+
+ // TODO 20100611 bleny make it configurable
+ /** time to wait beetween two look for a job */
+ protected static final int JOB_WAIT = 10 * 1000;
+
+ static {
+ RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_1_RUNNING);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_2_RUNNING);
+
+ RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2);
+ RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3);
+ }
+
+ private static final Log log = LogFactory.getLog(WorkersManager.class);
+
+ protected DisworkFileSystem fileSystem;
+ protected DisworkConfig config;
+
+ // Pool of workers ?
+ protected Worker worker;
+
+ // Activity strategy ?
+
+ protected class Worker implements Runnable {
+
+ public boolean shouldStop = false;
+
+ protected void log(String jobPath, String message) throws IOException {
+ String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH;
+ InputStream oldLogAsStream = fileSystem.read(logPath);
+ String oldLog = IOUtils.toString(oldLogAsStream);
+
+ String logEntry = message + "\n";
+
+ String newLog = oldLog + logEntry;
+
+ fileSystem.delete(logPath);
+ fileSystem.write(logPath, IOUtils.toInputStream(newLog));
+ }
+
+ protected boolean runJob(String jobPath) throws IOException {
+ log.info("running job at " + jobPath);
+
+ String jsdl;
+ try {
+ String jsdlPath = jobPath + "/" + DisworkDaemon.JSDL_PATH;
+ jsdl = IOUtils.toString(fileSystem.read(jsdlPath));
+ } catch (FileNotFoundException e) {
+ log.warn("job " + jobPath + " misses a job description");
+ return false;
+ }
+
+ log.info("read jsdl " + jsdl);
+
+ JobDescription jobDescription = JobDescription.parseJSDL(jsdl);
+
+ log.info("will run job " + jobDescription);
+
+ // create temp dir
+ File jobDir = new File(config.getTempDirectory(), jobDescription.getJobId());
+ jobDir.mkdirs();
+
+ // download application
+ String applicationPath = DisworkDaemon.getPathForDependency(
+ jobDescription.getApplicationName(),
+ jobDescription.getApplicationVersion());
+ InputStream applicationData = fileSystem.read(applicationPath);
+
+
+ File application = new File(jobDir, FilenameUtils.getName(applicationPath));
+ application.createNewFile();
+ log.info("will create " + application.getAbsolutePath());
+ OutputStream out = new FileOutputStream(application);
+ IOUtils.copy(applicationData, out);
+ // unzip application
+ ZipUtil.uncompress(application, jobDir);
+
+ // staging input files
+ for (String fileName : jobDescription.getStagingInput()) {
+ File localCopy = new File(jobDir, fileName);
+ localCopy.createNewFile();
+ InputStream source = null;
+ if (jobDescription.getStagingInputUrls().containsKey(fileName)) {
+ // download this file from URL
+ URL url = jobDescription.getStagingInputUrls().get(fileName);
+ log.info("downloading from " + url);
+ source = url.openStream();
+ } else {
+ // download this file from diswork
+ source = fileSystem.read(jobPath + "/" + fileName);
+ }
+ IOUtils.copy(source, new FileOutputStream(localCopy));
+ }
+
+ // executing the job
+ String commandLine = jobDescription.getCommandLine();
+ log.info("calling " + commandLine);
+ String[] bidule = commandLine.split(" ");
+ ProcessBuilder builder = new ProcessBuilder(bidule);
+ builder.directory(jobDir);
+ builder.redirectErrorStream(true);
+ Process job = builder.start();
+
+ // plugin a file on the standard input
+ String standardInputFileName = jobDescription.getStandardInput();
+ if (standardInputFileName != null) {
+ InputStream input = new FileInputStream(new File(jobDir, standardInputFileName));
+ IOUtils.copy(input, job.getOutputStream());
+ }
+
+ int exitValue = -1;
+ try {
+ exitValue = job.waitFor();
+ } catch (InterruptedException e) {
+ log.error("job " + jobDescription + " was interupted", e);
+
+ // FIXME 20100611 bleny job is considered has failed
+ exitValue = 1;
+ }
+
+ // dump the standard output in a file
+ String standardOutputFileName = jobDescription.getStandardOutput();
+ if (standardOutputFileName != null) {
+ OutputStream output = new FileOutputStream(new File(jobDir, standardOutputFileName));
+ IOUtils.copy(job.getInputStream(), output);
+ }
+
+ log.info("job returned " + exitValue);
+
+ // output file staging
+ for (String fileName : jobDescription.getStagingOutput()) {
+ File localCopy = new File(jobDir, fileName);
+ InputStream localCopyStream = new FileInputStream(localCopy);
+
+ String filePath = jobPath + "/" + fileName;
+
+ log.info("out-staging " + fileName);
+ // erase before write
+ if (fileSystem.exists(filePath)) {
+ fileSystem.delete(filePath);
+ }
+
+ fileSystem.write(filePath, localCopyStream);
+ localCopyStream.close();
+ }
+
+ // clean up the job directory
+ // FileUtil.deleteRecursively(jobDir);
+
+ boolean success = exitValue == 0;
+ if (success) {
+ log(jobPath, "DONE\nFINISHED");
+ } else {
+ log(jobPath, "FAILED\nFINISHED");
+ }
+ return success;
+ }
+
+ @Override
+ public void run() {
+ while (! shouldStop) {
+ // try to find a new job
+ try {
+ // TODO 20100609 bleny watch for other jobs
+ List<String> jobsNames =
+ fileSystem.readDirectory(DisworkDaemon.TODO);
+ if (jobsNames.size() != 0) {
+ // sort and choose the first, due to names, it should be
+ // the more ancient one
+ Collections.sort(jobsNames);
+ String oldName = jobsNames.get(0);
+
+ String newName = DisworkDaemon.newJobLinkName();
+ fileSystem.move(DisworkDaemon.TODO + "/" + oldName,
+ DisworkDaemon.TODO_RUNNING + "/" + newName);
+
+ boolean jobSuccess = runJob(DisworkDaemon.TODO_RUNNING + "/" + newName);
+
+ oldName = newName;
+ newName = DisworkDaemon.newJobLinkName();
+
+ if (jobSuccess) {
+ fileSystem.move(
+ DisworkDaemon.TODO_RUNNING + "/" + oldName,
+ DisworkDaemon.DONE + "/" + newName);
+ } else {
+ fileSystem.move(
+ DisworkDaemon.TODO_RUNNING + "/" + oldName,
+ DisworkDaemon.FAILED_3 + "/" + newName);
+ }
+ } else {
+ log.info("nothing to do");
+ Thread.sleep(JOB_WAIT);
+ }
+
+ } catch (IOException e) {
+ log.error("error while reading jobs", e);
+ // TODO 20100611 bleny manage exception
+ } catch (InterruptedException e) {
+ log.info("exception catch", e);
+ // TODO 20100611 bleny manage exception
+ }
+ }
+ }
+
+ }
+
+ public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) {
+ this.fileSystem = fileSystem;
+ this.config = config;
+ }
+
+ public void start() {
+ worker = new Worker();
+ Thread t = new Thread(worker);
+ t.start();
+ }
+
+ public void stop() {
+ worker.shouldStop = true;
+ }
+}
\ No newline at end of file
Added: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java (rev 0)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,106 @@
+package org.nuiton.diswork.daemon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DisworkDaemonTest {
+
+ protected DisworkDaemon daemon;
+
+ protected static int port = 39999;
+
+ @Before
+ public void setUp() throws Exception {
+ DisworkConfig config = DisworkConfig.newConfig();
+ port += 1;
+ config.setUsedPort(port);
+ daemon = new DisworkDaemon(config);
+ InputStream application = ClassLoader.getSystemResourceAsStream("fake-app-1.0.zip");
+ daemon.submitApplication("fake-app", "1.0", application);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.close();
+ }
+
+ @Test(expected = DisworkException.class)
+ public void testSubmitWithoutDependency() throws Exception {
+ JobDescription job = daemon.newJob();
+ job.setApplication("non-existing-application", "0.0");
+ daemon.submitJob(job);
+ }
+
+ @Test
+ public void testSubmitSuccessfulJob() throws Exception {
+ JobDescription job = daemon.newJob();
+ job.setApplication("fake-app", "1.0");
+ job.setCommandLine("%java -jar fake-app.jar");
+ daemon.submitJob(job);
+
+ while(! daemon.isFinished(job)) {
+ Thread.sleep(5 * 1000);
+ }
+
+ assertTrue(daemon.isSuccessful(job));
+ }
+
+ @Test
+ public void testSubmitFailJob() throws Exception {
+ JobDescription job = daemon.newJob();
+ job.setApplication("fake-app", "1.0");
+ job.setCommandLine("%java -jar fake-app.jar fail");
+ daemon.submitJob(job);
+
+ while(! daemon.isFinished(job)) {
+ Thread.sleep(5 * 1000);
+ }
+
+ assertTrue(daemon.isFailed(job));
+ }
+
+ @Test
+ public void testStaging() throws Exception {
+ JobDescription job = daemon.newJob();
+ job.setJobName("My Job");
+ job.setApplication("fake-app", "1.0");
+ job.setCommandLine("%java -jar fake-app.jar");
+
+ job.addStagingInput("example.com_index", new URL("http://www.example.com/"));
+ job.addStagingInput("input.txt");
+ job.addStagingOutput("output.txt");
+
+ job.setStandardInput("input.txt");
+ job.setStandardOutput("output.txt");
+
+ Map<String, InputStream> data = new HashMap<String, InputStream>();
+ data.put("input.txt", ClassLoader.getSystemResourceAsStream("input.txt"));
+
+ daemon.submitJob(job, data);
+
+ while(! daemon.isFinished(job)) {
+ Thread.sleep(1 * 1000);
+ }
+
+ assertTrue(daemon.isSuccessful(job));
+
+ Map<String, InputStream> results = daemon.getResults(job);
+
+ assertTrue(results.containsKey("output.txt"));
+
+ String output = IOUtils.toString(results.get("output.txt"));
+ assertEquals("a print on standard output\n", output);
+
+ }
+
+}
Added: trunk/diswork-daemon/src/test/resources/fake-app-1.0.zip
===================================================================
(Binary files differ)
Property changes on: trunk/diswork-daemon/src/test/resources/fake-app-1.0.zip
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: trunk/diswork-daemon/src/test/resources/fake-app.jar
===================================================================
(Binary files differ)
Property changes on: trunk/diswork-daemon/src/test/resources/fake-app.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: trunk/diswork-daemon/src/test/resources/input.txt
===================================================================
--- trunk/diswork-daemon/src/test/resources/input.txt (rev 0)
+++ trunk/diswork-daemon/src/test/resources/input.txt 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1 @@
+Hello
Added: trunk/diswork-daemon/src/test/resources/log4j.properties
===================================================================
--- trunk/diswork-daemon/src/test/resources/log4j.properties (rev 0)
+++ trunk/diswork-daemon/src/test/resources/log4j.properties 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,9 @@
+# Global logging configuration
+log4j.rootLogger=WARN, stdout
+# Console output...
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+# package level
+log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
+log4j.logger.org.nuiton.diswork.daemon=INFO
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -111,6 +111,7 @@
List<String> todos = fileSystem.readDirectory("/todo");
if (todos.isEmpty()) {
log.info("nothing to do");
+ System.out.println("nothing to do");
} else {
// taking a random job
Random random = new Random();
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -680,7 +680,7 @@
*/
protected void checkPathSyntax(String path) throws IOException {
if (!path.startsWith(EntryUtil.ROOT_DIRECTORY)) {
- throw new IOException("\" + path + \" is not correct, all pathes " +
+ throw new IOException("\"" + path + "\" is not correct, all pathes " +
"have to be absolute (thus, starts with)" +
EntryUtil.ROOT_DIRECTORY);
}
@@ -688,7 +688,7 @@
String doubleSeparator = EntryUtil.PATH_SEPARATOR
+ EntryUtil.PATH_SEPARATOR;
if (path.contains(doubleSeparator)) {
- throw new IOException("\" + path + \" is not correct, it contains "
+ throw new IOException("\"" + path + "\" is not correct, it contains "
+ doubleSeparator);
}
}
@@ -809,4 +809,21 @@
}
}
+
+ public void createDirectories(String path)
+ throws ConcurrentModificationException,
+ IOException {
+ log.info("trying create directories for " + path);
+
+ String pathWithoutRoot = path.substring(1, path.length());
+
+ String[] dirs = pathWithoutRoot.split(EntryUtil.PATH_SEPARATOR);
+ String dirPath = "";
+ for (String dir : dirs) {
+ dirPath += EntryUtil.PATH_SEPARATOR + dir;
+ if (!exists(dirPath)) {
+ createDirectory(dirPath);
+ }
+ }
+ }
}
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -81,8 +81,15 @@
* @return
* @throws UnknownHostException
*/
- public static String getIp() throws UnknownHostException {
- InetAddress result = InetAddress.getLocalHost();
+ public static String getIp() {
+ InetAddress result = null;
+ try {
+ result = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ // LocalHost is always known
+ log.error(e);
+ }
+
if (result.isLoopbackAddress()) {
try {
Socket temp = new Socket("microsoft.com", 80);
@@ -140,8 +147,7 @@
* @return
* @throws UnknownHostException
*/
- public static DisworkFileSystemConfig newPastryDisworkConfig()
- throws UnknownHostException {
+ public static DisworkFileSystemConfig newPastryDisworkConfig() {
return newPastryDisworkConfig(null);
}
@@ -153,8 +159,8 @@
* @return a complete config
* @throws UnknownHostException
*/
- public static DisworkFileSystemConfig newPastryDisworkConfig(Integer bootstrapPort)
- throws UnknownHostException {
+ public static DisworkFileSystemConfig
+ newPastryDisworkConfig(Integer bootstrapPort) {
DisworkFileSystemConfig result = new DisworkFileSystemConfig();
String port = getPort().toString();
String ip = getIp();
@@ -169,13 +175,12 @@
return result;
}
- public static DisworkFileSystemConfig newKademliaDisworkConfig()
- throws UnknownHostException {
+ public static DisworkFileSystemConfig newKademliaDisworkConfig() {
return newKademliaDisworkConfig(null);
}
- public static DisworkFileSystemConfig newKademliaDisworkConfig(Integer bootstrapPort)
- throws UnknownHostException {
+ public static DisworkFileSystemConfig
+ newKademliaDisworkConfig (Integer bootstrapPort) {
DisworkFileSystemConfig result = new DisworkFileSystemConfig();
String port = getPort().toString();
String ip = getIp();
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMap.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMap.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMap.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -39,6 +39,8 @@
public class InMemoryDisworkMap extends HashMap<String, byte[]>
implements DisworkMap {
+ private static final long serialVersionUID = 2467699175129909832L;
+
private final Log log = LogFactory.getLog(InMemoryDisworkMap.class);
@Override
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/KademliaDisworkMap.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -49,14 +49,15 @@
protected Kademlia kad;
public KademliaDisworkMap(DisworkFileSystemConfig config) throws IOException {
- log.info("booting kademlia dht on port " + config.getUsedPort());
+
kad = new Kademlia(Identifier.randomIdentifier(), config.getUsedPort());
-
+
if (config.getBootstrapIp() != null) {
InetSocketAddress bootstrap = new InetSocketAddress(
config.getBootstrapIp(),
config.getBootstrapPort());
try {
+ log.info("trying to connect to " + bootstrap);
kad.connect(bootstrap);
} catch (RoutingException e) {
log.error("bootstrap node is unreachable", e);
@@ -64,6 +65,8 @@
}
}
+ log.info("kademlia status : " + kad);
+
//Identifier.IDSIZE = 1024;
log.info("using " + Identifier.IDSIZE + " bytes for identifiers");
}
@@ -93,7 +96,7 @@
}
- log.info("key for string " + s + " is " + id);
+ log.debug("key for string " + s + " is " + id);
return id;
}
Deleted: trunk/diswork-fs/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-fs/src/main/resources/log4j.properties 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/main/resources/log4j.properties 2010-06-11 15:52:09 UTC (rev 73)
@@ -1,9 +0,0 @@
-# Global logging configuration
-log4j.rootLogger=WARN, stdout
-# Console output...
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
-# package level
-log4j.logger.org.nuiton.diswork.fs=WARN
-log4j.logger.org.nuiton.diswork.fs.Demo=INFO
\ No newline at end of file
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -394,4 +394,10 @@
}
*/
}
+
+ @Test
+ public void testCreateDirectories() throws Exception {
+ fileSystem.createDirectories("/dir/subdir/subsubdir");
+ assertTrue(fileSystem.exists("/dir/subdir/subsubdir"));
+ }
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -27,7 +27,7 @@
// finally, initiate the fileSystem
DisworkFileSystemConfig disworkConfig1 =
DisworkFileSystemConfig.newKademliaDisworkConfig();
- bootstrapPort = disworkConfig1.getUsedPort();
+ bootstrapPort = disworkConfig1.getUsedPort() + 1;
fileSystem = new DisworkFileSystem(disworkConfig1);
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-06-10 09:32:59 UTC (rev 72)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-06-11 15:52:09 UTC (rev 73)
@@ -3,7 +3,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -12,7 +11,6 @@
import org.junit.Before;
import org.junit.Test;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
-import org.nuiton.diswork.fs.storage.KademliaDisworkMap;
import org.planx.xmlstore.routing.Identifier;
public class KademliaDisworkMapTest extends AbstractDisworkMapTest {
@@ -24,7 +22,7 @@
map1 = new KademliaDisworkMap(config1);
DisworkFileSystemConfig config2 = DisworkFileSystemConfig
- .newKademliaDisworkConfig(config1.getUsedPort());
+ .newKademliaDisworkConfig(config1.getUsedPort()+1);
map2 = new KademliaDisworkMap(config2);
}
@@ -72,7 +70,7 @@
* a bad bootstrap
* @throws Exception
*/
- @Test(expected = IOException.class)
+ @Test(expected = org.planx.xmlstore.routing.RoutingException.class)
public void testBadBootrap() throws Exception {
DisworkFileSystemConfig config1 =
DisworkFileSystemConfig.newKademliaDisworkConfig();
Copied: trunk/diswork-fs/src/test/resources/log4j.properties (from rev 72, trunk/diswork-fs/src/main/resources/log4j.properties)
===================================================================
--- trunk/diswork-fs/src/test/resources/log4j.properties (rev 0)
+++ trunk/diswork-fs/src/test/resources/log4j.properties 2010-06-11 15:52:09 UTC (rev 73)
@@ -0,0 +1,10 @@
+# Global logging configuration
+log4j.rootLogger=WARN, stdout
+# Console output...
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+# package level
+log4j.logger.org.nuiton.diswork.fs=WARN
+log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
+log4j.logger.org.nuiton.diswork.fs.Demo=INFO
\ No newline at end of file
1
0