Author: bleny Date: 2010-07-09 13:41:12 +0200 (Fri, 09 Jul 2010) New Revision: 101 Url: http://nuiton.org/repositories/revision/diswork/101 Log: stats, gestion des exceptions dans le d?\195?\169mon Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-09 11:41:12 UTC (rev 101) @@ -112,6 +112,7 @@ /** use this strategy to never run a job */ public static class NoActivity implements ActivityStrategy { + @Override public boolean canWork() { return false; Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java (rev 0) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-09 11:41:12 UTC (rev 101) @@ -0,0 +1,19 @@ +package org.nuiton.diswork.daemon; + +public class BadConfigurationException extends DisworkException { + + private static final long serialVersionUID = 1L; + + public BadConfigurationException(String message, Throwable cause) { + super(message, cause); + } + + public BadConfigurationException(String message) { + super(message); + } + + public BadConfigurationException(Throwable cause) { + super(cause); + } + +} Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java (rev 0) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-09 11:41:12 UTC (rev 101) @@ -0,0 +1,19 @@ +package org.nuiton.diswork.daemon; + +public class BadJobException extends DisworkException { + + private static final long serialVersionUID = 1L; + + public BadJobException(String message, Throwable cause) { + super(message, cause); + } + + public BadJobException(String message) { + super(message); + } + + public BadJobException(Throwable cause) { + super(cause); + } + +} Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-09 11:41:12 UTC (rev 101) @@ -78,6 +78,8 @@ * <dt>diswork.tokens_file</dt> * <dd>a path to a file containing token-names and the replacement strings * this is a property file</dd> + * <dt>diswork.job_looks_wait_time</dt> + * <dd>the number of seconds to wait between two looks for a job.</dd> * </dl> * * For the http front-end : @@ -122,6 +124,7 @@ + File.separator + "diswork"); setDefaultOption("diswork.activity_strategy", "unlimited"); + setDefaultOption("diswork.job_looks_wait_time", "60"); // 1 minute setOption("diswork.http_front_end.start", "true"); setOption("diswork.http_front_end.port", "8080"); @@ -160,10 +163,13 @@ /** * Read the tokens file if one is given in the config and merge the content * of this file into {@link #tokens} + * @throws DisworkSystemException + * @throws BadConfigurationException + * @throws LocalFileException * * @throws DisworkException */ - protected void initTokens() throws DisworkException { + protected void initTokens() throws BadConfigurationException, LocalFileException { tokens = new HashMap<String, String>(); String java = // full java path @@ -193,11 +199,11 @@ } } catch (FileNotFoundException e) { log.warn("tokens file not found, 0 tokens loaded", e); - throw new DisworkException("tokens file not found," + throw new BadConfigurationException("tokens file not found," + "no token loaded", e); } catch (IOException e) { log.error("can't read tokens file", e); - throw new DisworkException("can't read tokens file", e); + throw new LocalFileException("can't read tokens file", e); } } } @@ -218,7 +224,7 @@ } protected String parseCommandLine(String commandLine, String tempDir) - throws DisworkException { + throws BadConfigurationException, LocalFileException { if (tokens == null) { initTokens(); } @@ -240,9 +246,10 @@ /** * * @return null if no path for a file have been specified - * @throws DisworkException + * @throws BadConfigurationException + * @throws LocalFileException */ - protected List<CronExpression> getSchedule() throws DisworkException { + protected List<CronExpression> getSchedule() throws BadConfigurationException, LocalFileException { // lazy instanciation of schedule if (schedule == null) { String path = getOption("diswork.schedule_file"); @@ -263,12 +270,14 @@ } } } catch (FileNotFoundException e) { - log.error(e); - throw new DisworkException(e); + log.error("schedule file doesn't exists", e); + throw new BadConfigurationException("schedule file doesn't exists", e); } catch (IOException e) { log.error(e); - throw new DisworkException(e); + throw new LocalFileException("can't read schedule file", e); } + } else { + throw new BadConfigurationException("schedule file has not been specified"); } } return schedule; @@ -407,4 +416,13 @@ public void setHttpFrontendPort(Integer httpFrontendPort) { setOption("diswork.http_front_end.port", httpFrontendPort.toString()); } + + /** number of seconds to wait between two look for a jobs (seconds) */ + public int getJobLooksWaitTime() { + return getOptionAsInt("diswork.job_looks_wait_time"); + } + + public void setJobLooksWaitTime(Integer seconds) { + setOption("diswork.job_looks_wait_time", seconds.toString()); + } } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-09 11:41:12 UTC (rev 101) @@ -493,13 +493,15 @@ public void submitJob(JobDescription jobDescription) throws DisworkException { - if (jobDescription.getInputData().size() + jobDescription.getInputUrls().size() - < jobDescription.getInput().size()) { - // dependencies are missing + // check all dependencies are provided + for (String name : jobDescription.getInput()) { + if (!jobDescription.getInputData().containsKey(name) && + !jobDescription.getInputUrls().containsKey(name)) { + throw new BadJobException("dependency " + name + " is missing"); + } } - + try { - // trying to put the job in a new directory of home Random random = new Random(); boolean alreadyExists = true; @@ -510,8 +512,9 @@ alreadyExists = fileSystem.exists(getPathForJob(newJobIntendifier)); } + // XXX side-effect jobDescription.setJobId(newJobIntendifier); - + // create both job path and sub-directory .diswork fileSystem.createDirectories( getPathForJob(jobDescription) + "/" + ".diswork"); @@ -565,11 +568,8 @@ } } } - config.addOneJobSubmitted(); - log.info("job submited"); - } catch (DisworkFileSystemException e) { log.error("file system error", e); throw new DisworkException(e); @@ -710,12 +710,17 @@ result.put("total_uptime", getTotalUptime().toString()); result.put("uptime_ratio", numberFormat.format(getUptimeRatio())); - // TODO 20100706 bleny compute number of jobs done, number of jobs submitted, ratio, result.put("jobs_done", config.getNumberOfJobsDone().toString()); result.put("jobs_submitted", config.getNumberOfJobsSubmitted().toString()); - result.put("jobs_ratio", "?"); + + if (config.getNumberOfJobsSubmitted() == 0) { + result.put("jobs_ratio", "∞"); + } else { + Double jobsRatio = (double) (config.getNumberOfJobsDone() / config.getNumberOfJobsSubmitted()); + result.put("jobs_ratio", numberFormat.format(jobsRatio)); + } - Double karma = (config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio(); + Double karma = ((config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio()); result.put("karma", numberFormat.format(karma)); return result; } @@ -855,4 +860,16 @@ workers.activeScheduledActivityStrategy(); } } + + /** + * + * @return null if working is disabled + * @throws DisworkException + */ + public List<JobDescription> getAllWorkersCurrentJobs() throws DisworkException { + if (workers != null) { + return workers.getAllWorkersCurrentJobs(); + } + return null; + } } \ No newline at end of file Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-09 11:41:12 UTC (rev 101) @@ -137,9 +137,10 @@ DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], port)); config.setActivityStrategy("none"); config.setUsedPort(30000); + config.setStartHttpFrontend(false); daemon = new DisworkDaemon(config); - //userPrompt(); - isisSubmit(); + userPrompt(); + //isisSubmit(); } } Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java (rev 0) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-09 11:41:12 UTC (rev 101) @@ -0,0 +1,19 @@ +package org.nuiton.diswork.daemon; + +public class DisworkSystemException extends DisworkException { + + private static final long serialVersionUID = 1L; + + public DisworkSystemException(String message, Throwable cause) { + super(message, cause); + } + + public DisworkSystemException(String message) { + super(message); + } + + public DisworkSystemException(Throwable cause) { + super(cause); + } + +} Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-09 11:41:12 UTC (rev 101) @@ -132,6 +132,45 @@ pageContent += "</table>\n\n"; + + + + pageContent += "<h2>Currently running jobs</h2>" + "\n\n" + + "<table>" + "\n" + + " <tr>" + "\n" + + " <th>Name</th>" + "\n" + + " <th>Application</th>" + "\n" + + " </tr>" + "\n"; + + try { + jobs = daemon.getAllWorkersCurrentJobs(); + } catch (DisworkException e) { + log.error("error while retrieving local stats", e); + throw new ServletException("error while retrieving local stats", e); + } + + if (jobs.isEmpty()) { + pageContent += " <tr>\n" + + " <td colspan=\"2\"><em>working is disabled</em></td>\n" + + " </tr>\n"; + } else { + for (JobDescription job : jobs) { + + if (job == null) { + pageContent += " <tr>\n" + + " <td colspan=\"2\"><em>no job</em></td>\n" + + " </tr>\n"; + } else { + pageContent += " <tr>\n" + + " <td>" + job.getJobName() + "</td>\n" + + " <td>" + job.getApplicationName() + "</td>\n" + + " </tr>\n"; + } + } + } + pageContent += "</table>\n\n"; + + pageContent += "<h2>Diswork statistics</h2>\n\n"; Map<String, String> stats; try { Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-09 11:41:12 UTC (rev 101) @@ -243,9 +243,10 @@ * Factory method to get a JobDescription from JSDL * @param jsdl the content of the JSDL file * @return a job description representing the content of the JSDL + * @throws BadJobException * @throws IOException if JSDL is malformed or if an URL is malformed */ - public static JobDescription parseJSDL(String jsdl) throws IOException { + public static JobDescription parseJSDL(String jsdl) throws BadJobException { // TODO 20100616 bleny correctly set dependency to JDOM in pom.xml JobDescription result = new JobDescription(); @@ -324,10 +325,10 @@ } } catch (JDOMException e) { log.error("can't read malformed JSDL file", e); - throw new IOException("can't read malformed JSDL file", e); + throw new BadJobException("can't read malformed JSDL file", e); } catch (MalformedURLException e) { log.error("malformed URL", e); - throw new IOException("malformed URL", e); + throw new BadJobException("malformed URL", e); } return result; } Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java (rev 0) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-09 11:41:12 UTC (rev 101) @@ -0,0 +1,19 @@ +package org.nuiton.diswork.daemon; + +public class LocalFileException extends DisworkException { + + private static final long serialVersionUID = 1L; + + public LocalFileException(String message, Throwable cause) { + super(message, cause); + } + + public LocalFileException(String message) { + super(message); + } + + public LocalFileException(Throwable cause) { + super(cause); + } + +} Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-09 11:41:12 UTC (rev 101) @@ -28,6 +28,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -41,6 +42,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,6 +83,9 @@ /** a job found in dir "key" that fail should be moved to "value" */ protected static Map<String, String> FAILED_MOVE = new HashMap<String, String>(); + /** a job found in dir "key" that is interrupted should be moved to "value" */ + protected static Map<String, String> INTERRUPTED_MOVE = new HashMap<String, String>(); + static { // initialize RUNNING_MOVE and FAILED_MOVE constants RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING); @@ -93,6 +98,10 @@ FAILED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.FAILED_1); FAILED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_2); FAILED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_3); + + INTERRUPTED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.TODO); + INTERRUPTED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_1); + INTERRUPTED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_2); } protected DisworkFileSystem fileSystem; @@ -106,17 +115,14 @@ protected File applicationCache; - protected Boolean flag = Boolean.FALSE; + protected Boolean flag = false; protected final Object sem = new Object(); protected void updateFlag() throws DisworkException { - boolean newStatus = activityStrategy.canWork(); - if (newStatus != flag) { - synchronized (sem) { - flag = newStatus; - sem.notifyAll(); - } + synchronized (sem) { + flag = activityStrategy.canWork(); + sem.notifyAll(); } } @@ -142,10 +148,6 @@ */ protected class Worker extends Thread { - // TODO 20100611 bleny make it configurable - /** time to wait between two look for a job */ - protected static final int JOB_WAIT = 10 * 1000; - // TODO 20100614 bleny make it configurable /** after this time (ms), a job is considered as no longer running */ protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000; @@ -153,6 +155,18 @@ /** set this field to true will make run() return and thread stop */ protected boolean shouldStop = false; + /** the current job, null if worker do nothing */ + protected JobDescription currentJob = null; + + /** current job path on diwork FS */ + protected String currentJobPath = null; + + /** current job temp directory on the local file-system */ + protected File currentJobDir = null; + + /** current process, null if nothing is running */ + protected Process currentProcess; + /** read the standard output of the subprocess * * By reading the standard output, this thread has multiple goals : @@ -224,7 +238,7 @@ log.warn("error while closing the output of the subprocess", e); } } - } + } } /** @@ -235,162 +249,286 @@ * the log * @throws IOException */ - protected void log(String jobPath, String message) - throws DisworkFileSystemException, IOException { - String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH; - InputStream oldLogAsStream = fileSystem.read(logPath); - String oldLog = IOUtils.toString(oldLogAsStream); - String logEntry = message + "\n"; - String newLog = oldLog + logEntry; - fileSystem.write(logPath, IOUtils.toInputStream(newLog)); + protected void log(String jobPath, String... messages) + throws DisworkSystemException { + try { + String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH; + InputStream oldLogAsStream = fileSystem.read(logPath); + String oldLog = IOUtils.toString(oldLogAsStream); + String logEntry = ""; + for (String message : messages) { + logEntry += message + "\n"; + } + String newLog = oldLog + logEntry; + log.debug("writing new log content" + newLog); + fileSystem.write(logPath, IOUtils.toInputStream(newLog)); + } catch (DisworkFileSystemException e) { + log.error("unable to read or write log file", e); + throw new DisworkSystemException("unable to read or write log file", e); + } catch (IOException e) { + log.error("unable to read log file", e); + throw new DisworkSystemException("unable to read log file", e); + } } - /** - * Download all the files needed for a job in a temp directory, run - * the job, wait for it to end, write all the results. Mark the job - * as running at the beginning and move it to DONE or FAILED at - * the end, depending of the results - * @param jobPath - * @return - * @throws DisworkFileSystemException - * @throws IOException - * @throws DisworkException - */ - protected boolean runJob(String jobPath) - throws IOException, - DisworkFileSystemException, - DisworkException { - log.info("running job at " + jobPath); - - String jsdlPath = jobPath + "/" + DisworkDaemon.JSDL_PATH; - String jsdl = IOUtils.toString(fileSystem.read(jsdlPath)); - - - log.info("read jsdl " + jsdl); - - JobDescription jobDescription = JobDescription.parseJSDL(jsdl); - - log.info("will run job " + jobDescription); - - // create temp dir - File jobDir = FileUtil.createTempDirectory("job", "", - new File(config.getTempDirectory())); - jobDir.mkdirs(); - + protected void downloadApplication() throws DisworkSystemException, LocalFileException { // download application - if (jobDescription.getApplicationName() != null) { - log.info("dependency needed for " + jobDescription + " (" + - jobDescription.getApplicationName() + "-" + - jobDescription.getApplicationVersion() + ")"); + if (currentJob.getApplicationName() != null) { + log.info("dependency needed for " + currentJob + " (" + + currentJob.getApplicationName() + "-" + + currentJob.getApplicationVersion() + ")"); File application = getApplicationData( - jobDescription.getApplicationName(), - jobDescription.getApplicationVersion()); + currentJob.getApplicationName(), + currentJob.getApplicationVersion()); // unzip application log.info("unzip application start"); - ZipUtil.uncompress(application, jobDir); + try { + ZipUtil.uncompress(application, currentJobDir); + } catch (IOException e) { + log.error("error occured while extracting the application", e); + throw new LocalFileException("error occured while extracting the application", e); + } log.info("unzip application finished"); } else { - log.info("no dependency specified for " + jobDescription); - } - + log.info("no dependency specified for " + currentJob); + } + } + + protected void stageInputFiles() throws DisworkSystemException, LocalFileException { // staging input files - for (String fileName : jobDescription.getInput()) { + for (String fileName : currentJob.getInput()) { log.info("staging " + fileName); - File localCopy = new File(jobDir, fileName); - localCopy.createNewFile(); InputStream source = null; - if (jobDescription.getInputUrls().containsKey(fileName)) { + if (currentJob.getInputUrls().containsKey(fileName)) { // download this file from URL - URL url = jobDescription.getInputUrls().get(fileName); + URL url = currentJob.getInputUrls().get(fileName); log.info("downloading from " + url); - source = url.openStream(); + try { + source = url.openStream(); + } catch (IOException e) { + log.error("failed to download input data from" + url, e); + throw new DisworkSystemException("failed to download input data from" + url, e); + } } else { // download this file from diswork - source = fileSystem.read(jobPath + "/" + fileName); + try { + source = fileSystem.read(currentJobPath + "/" + fileName); + } catch (DisworkFileSystemException e) { + log.error("unable to read input file from diswork", e); + throw new DisworkSystemException("unable to read input file from diswork", e); + } } - IOUtils.copy(source, new FileOutputStream(localCopy)); + + try { + File localCopy = new File(currentJobDir, fileName); + localCopy.createNewFile(); + IOUtils.copy(source, new FileOutputStream(localCopy)); + } catch (IOException e) { + log.error("unable to write input file to local dir", e); + throw new LocalFileException("unable to write input file to local dir", e); + } } - + } + + protected void prepareAndRunJob() throws DisworkException { log.info("preparing the job"); // prepare the job and run it String commandLine = config.parseCommandLine( - jobDescription.getCommandLine(), - jobDir.getAbsolutePath()); + currentJob.getCommandLine(), + currentJobDir.getAbsolutePath()); // String[] commandLineElements = commandLine.split(" "); String[] commandLineElements = StringUtil.split(commandLine, " "); ProcessBuilder builder = new ProcessBuilder(commandLineElements); - builder.directory(jobDir); + builder.directory(currentJobDir); builder.redirectErrorStream(true); - log.info("calling " + commandLine); - Process job = builder.start(); + log.info("process will call " + commandLine); + try { + currentProcess = builder.start(); + } catch (IOException e) { + log.error("unable to run process for job" + currentJob, e); + throw new LocalFileException("unable to run process for job" + currentJob, e); + } + + // catch (Throwable processError) {} // start a thread to constantly read on the standard output - String standardOutputFileName = jobDescription.getStandardOutput(); + String standardOutputFileName = currentJob.getStandardOutput(); log.info("standardOutputFileName is " + standardOutputFileName); OutputStream outputFileStream = null; if (standardOutputFileName != null) { - File outputFile = new File(jobDir, standardOutputFileName); + File outputFile = new File(currentJobDir, standardOutputFileName); log.info("writing standard output in " + outputFile); - outputFileStream = new FileOutputStream(outputFile); + try { + outputFileStream = new FileOutputStream(outputFile); + } catch (FileNotFoundException e) { + // should not occur since we just created the file + log.error("unable to find standard output file", e); + throw new LocalFileException("unable to find standard output file", e); + } } - OutputReader outputReader = new OutputReader(job.getInputStream(), + OutputReader outputReader = new OutputReader(currentProcess.getInputStream(), outputFileStream); outputReader.start(); // plugging a file on the standard input - String standardInputFileName = jobDescription.getStandardInput(); + String standardInputFileName = currentJob.getStandardInput(); if (standardInputFileName != null) { log.info("writing " + standardInputFileName + " on standard " + "input"); - InputStream input = new FileInputStream( - new File(jobDir, standardInputFileName)); - IOUtils.copy(input, job.getOutputStream()); - } - - // run the process and wait for it to return - int exitValue = -1; - try { - log.info("waiting for the end of the process"); - exitValue = job.waitFor(); - } catch (InterruptedException e) { - log.error("job " + jobDescription + " was interrupted", e); - // FIXME 20100611 bleny job is considered has failed - exitValue = 1; + try { + InputStream input = new FileInputStream( + new File(currentJobDir, standardInputFileName)); + IOUtils.copy(input, currentProcess.getOutputStream()); + } catch (FileNotFoundException e) { + // file may not have been provided by job submitter + if (currentJob.getInput().contains(standardInputFileName)) { + log.error("standard input file is not found", e); + throw new DisworkSystemException("standard input file is not found", e); + } else { + throw new BadJobException("standard input file is not provided in " + currentJob); + } + } catch (IOException e) { + log.error("unable to read data from input file", e); + throw new DisworkSystemException("unable to read data from input file", e); + } } - log.info("job returned " + exitValue); + } + protected void stageOutputFiles() throws DisworkSystemException, BadJobException { // output file staging - for (String fileName : jobDescription.getOutput()) { + for (String fileName : currentJob.getOutput()) { log.info("staging file " + fileName); - File localCopy = new File(jobDir, fileName); - + File localCopy = new File(currentJobDir, fileName); + if (localCopy.exists()) { - InputStream localCopyStream = new FileInputStream(localCopy); - - String filePath = jobPath + "/" + fileName; - - // erase before write - if (fileSystem.exists(filePath)) { - fileSystem.delete(filePath); + String filePath = currentJobPath + "/" + fileName; + // copy local file to diswork FS + + InputStream localCopyStream = null; + try { + localCopyStream = new FileInputStream(localCopy); + + // erase before write if an input file as an output too + if (fileSystem.exists(filePath)) { + fileSystem.delete(filePath); + } + + fileSystem.write(filePath, localCopyStream); + } catch (FileNotFoundException e) { + // file exists, tested just before + } catch (DisworkFileSystemException e) { + log.error("error while uploading results", e); + throw new DisworkSystemException("error while uploading results", e); + } finally { + IOUtils.closeQuietly(localCopyStream); } + } else { + throw new BadJobException("job " + currentJob + " do not produces a file" + fileName); + } + } + } + + /** + * Download all the files needed for a job in a temp directory, run + * the job, wait for it to end, write all the results. Mark the job + * as running at the beginning and move it to DONE or FAILED at + * the end, depending of the results + * @param currentJobPath + * @return + * @throws BadJobException + * @throws DisworkFileSystemException + * @throws IOException + * @throws DisworkException + */ + protected void runJob() throws DisworkException { + try { + log.info("running job at " + currentJobPath); + try { + String jsdlPath = currentJobPath + "/" + DisworkDaemon.JSDL_PATH; + String jsdl = IOUtils.toString(fileSystem.read(jsdlPath)); + log.info("read jsdl " + jsdl); + currentJob = JobDescription.parseJSDL(jsdl); + } catch (IOException e) { + log.error("unable to read or parse JSDL", e); + throw new DisworkSystemException("unable to read or parse JSDL", e); + } catch (DisworkFileSystemException e) { + log.error("unable to read JSDL", e); + throw new DisworkSystemException("unable to read JSDL", e); + } - fileSystem.write(filePath, localCopyStream); - localCopyStream.close(); + log.info("will run job " + currentJob); + + // create temp dir + try { + currentJobDir = FileUtil.createTempDirectory("job", "", + new File(config.getTempDirectory())); + currentJobDir.mkdirs(); + } catch (IOException e) { + log.error("unable to create temp directory for job", e); + throw new LocalFileException("unable to create temp directory for job", e); } + + downloadApplication(); + + stageInputFiles(); + + // until there we didn't started the job, it's not too late to + // stop, last check of shouldStrop + if (!shouldStop) { + prepareAndRunJob(); + + // wait for the process to return + jobIsStarted(currentJobPath); + boolean processFinished = false; + while (!processFinished) { + Integer exitValue; // exitValue if the process is + // finished, null if the process + // is not finished + try { + exitValue = currentProcess.exitValue(); + processFinished = true; + } catch (IllegalThreadStateException e) { + // process is not finished + exitValue = null; + } + + if (exitValue == null) { + if (shouldStop) { + jobIsInterrupted(currentJobPath); + processFinished = true; + } else { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + log.error("worker interupted", e); + throw new DisworkException("worker interupted", e); + } + } + } else { + stageOutputFiles(); + // job is finished + if (exitValue == 0) { + // job is successful + jobIsSuccessful(currentJobPath); + } else { + jobIsFailed(currentJobPath); + } + } + } + } else { + jobIsInterrupted(currentJobPath); + } + } catch (BadJobException e) { + jobIsFailed(currentJobPath); + } finally { + currentJob = null; + currentProcess = null; + // clean up the job directory + FileUtil.deleteRecursively(currentJobDir); + currentJobDir = null; } - - // clean up the job directory - FileUtil.deleteRecursively(jobDir); - - boolean success = exitValue == 0; - if (success) { - log(jobPath, "DONE"); - } else { - log(jobPath, "FAILED"); - } - return success; } /** @@ -409,156 +547,205 @@ } } - protected void findAJobAndRunIt() throws IOException, - DisworkFileSystemException, - DisworkException { - - // Once a job is found, those two var will be set - String jobLinkDir = null; - String jobLinkName = null; - - // use a synchronized block because multiple workers - // may try to take a same job - synchronized (fileSystem) { + /** + * try to find a job, if found, take it and return the path + * @return the path to the job, null if no job found + * @throws DisworkSystemException + */ + protected String findAJob() throws DisworkSystemException { + try { + // Once a job is found, those two var will be set + String jobLinkDir = null; + String jobLinkName = null; - // fist, try to find a job declared has running since - // too long to re-run it - String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, - DisworkDaemon.FAILED_1_RUNNING, - DisworkDaemon.TODO_RUNNING - }; - // browsing all "running" dirs - for (String path : runningJobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { - Long linkAge = System.currentTimeMillis() - - Long.parseLong(oldName); - // check is oldest job is too old and should be - // considered has to-be-rerun - if (linkAge > MAX_JOB_RUNNING_TIME) { - log.info("taking old job (age = " + linkAge + ")"); - jobLinkDir = path; - jobLinkName = oldName; - // FIXME 20100617 bleny break s*cks - break; + // use a synchronized block because multiple workers + // may try to take a same job + synchronized (fileSystem) { + + // fist, try to find a job declared has running since + // too long to re-run it + String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, + DisworkDaemon.FAILED_1_RUNNING, + DisworkDaemon.TODO_RUNNING + }; + // browsing all "running" dirs + for (String path : runningJobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { + Long linkAge = System.currentTimeMillis() + - Long.parseLong(oldName); + // check is oldest job is too old and should be + // considered has to-be-rerun + if (linkAge > MAX_JOB_RUNNING_TIME) { + log.info("taking old job (age = " + linkAge + ")"); + jobLinkDir = path; + jobLinkName = oldName; + // FIXME 20100617 bleny break s*cks + break; + } } } - } - - // if no job was found, search now in not running jobs - if (jobLinkDir == null) { - String[] jobsDirs = { DisworkDaemon.FAILED_2, - DisworkDaemon.FAILED_1, - DisworkDaemon.TODO - }; - for (String path : jobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { // take it - jobLinkDir = path; - jobLinkName = oldName; - // FIXME 20100617 bleny break s*cks - break; + + // if no job was found, search now in not running jobs + if (jobLinkDir == null) { + String[] jobsDirs = { DisworkDaemon.FAILED_2, + DisworkDaemon.FAILED_1, + DisworkDaemon.TODO + }; + for (String path : jobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { // take it + jobLinkDir = path; + jobLinkName = oldName; + // FIXME 20100617 bleny break s*cks + break; + } } } + + if (jobLinkDir != null) { + // move the link before running the job + String oldPath = jobLinkDir + "/" + jobLinkName; + log.info("job found at " + oldPath); + + jobLinkDir = RUNNING_MOVE.get(jobLinkDir); + jobLinkName = DisworkDaemon.newJobLinkName(); + String newPath = jobLinkDir + "/" + jobLinkName; + + log.info("moving " + oldPath + " to " + newPath); + fileSystem.move(oldPath, newPath); + + } } - + + String jobPath = null; + // now, if no job was found if (jobLinkDir != null) { - // move the link before running the job - String oldPath = jobLinkDir + "/" + jobLinkName; - log.info("job found at " + oldPath); - - jobLinkDir = RUNNING_MOVE.get(jobLinkDir); - jobLinkName = DisworkDaemon.newJobLinkName(); - String newPath = jobLinkDir + "/" + jobLinkName; - - log.info("moving " + oldPath + " to " + newPath); - fileSystem.move(oldPath, newPath); - + jobPath = jobLinkDir + "/" + jobLinkName; + } else { + try { + log.info("look for a job was unsucessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try"); + Thread.sleep(config.getJobLooksWaitTime() * 1000); + } catch (InterruptedException e) { + log.error("worker interrupted while waiting before trying to find a new job", e); + throw new DisworkSystemException("worker interrupted while waiting before trying to find a new job", e); + } } + return jobPath; + } catch (DisworkFileSystemException e) { + log.error("error while trying to find a job", e); + throw new DisworkSystemException("error while trying to find a job", e); } - - // now, if no job was found, do nothing - if (jobLinkDir == null) { - log.info("nothing to do"); - try { - Thread.sleep(JOB_WAIT); - } catch (InterruptedException e) { - log.warn("worker interrupted while waiting", e); - // TODO 20100629 bleny ? - } - // if a job was found, take it + } + + /** update the log of the job + * permit the user to use + * {@link DisworkDaemon#isSuccessful(JobDescription)} + */ + protected void jobIsSuccessful(String jobPath) throws DisworkSystemException { + String newJobPath = DisworkDaemon.DONE + "/" + DisworkDaemon.newJobLinkName(); + try { + log.info("moving " + jobPath + " to " + newJobPath); + fileSystem.move(jobPath, newJobPath); + } catch (DisworkFileSystemException e) { + log.error("error while moving job link", e); + throw new DisworkSystemException("error while moving job link", e); + } + log.info("marking " + newJobPath + " as done and finished"); + log(newJobPath, "DONE", "FINISHED"); + config.addOneJobDone(); + } + + /** update the log of the job and move the link + * permit the job submitter to use + * {@link DisworkDaemon#isFailed(JobDescription)} + */ + protected void jobIsFailed(String jobPath) throws DisworkSystemException { + String jobDir = FilenameUtils.getFullPathNoEndSeparator(jobPath); + String newDir = FAILED_MOVE.get(jobDir); + String newJobPath = newDir + "/" + DisworkDaemon.newJobLinkName(); + + try { + log.info("moving " + jobPath + " to " + newJobPath); + fileSystem.move(jobPath, newJobPath); + } catch (DisworkFileSystemException e) { + log.error("error while moving job link", e); + throw new DisworkSystemException("error while moving job link", e); + } + + if (newDir.equals(DisworkDaemon.FAILED_3)) { + log.info("marking " + newJobPath + " as failed and finished"); + log(newJobPath, "FAILED", "FINISHED"); } else { + log.info("marking " + newJobPath + " as failed"); + log(newJobPath, "FAILED"); + } + } - String jobPath = jobLinkDir + "/" + jobLinkName; - - if (fileSystem.exists(jobPath + "/" + DisworkDaemon.JSDL_PATH)) { - - log(jobPath, "STARTED"); - - boolean jobSuccess = runJob(jobPath); - - // move the link after the job - String newDir = null; - if (jobSuccess) { - newDir = DisworkDaemon.DONE; - config.addOneJobDone(); - } else { - newDir = FAILED_MOVE.get(jobLinkDir); - } - String newPath = newDir + "/" + DisworkDaemon.newJobLinkName(); - - log.info("moving " + jobPath + " to " + newPath); - fileSystem.move(jobPath, newPath); - - // mark the job has finished if done or failed too many - // times - if ( newDir == DisworkDaemon.DONE || - newDir == DisworkDaemon.FAILED_3) { - log.info("marking " + newPath + " as finished"); - log(newPath, "FINISHED"); - } - } else { - // job has been canceled - fileSystem.delete(jobPath); - } + /** put the job back where it was found */ + protected void jobIsInterrupted(String jobPath) throws DisworkSystemException { + String jobDir = FilenameUtils.getFullPathNoEndSeparator(jobPath); + String jobName = FilenameUtils.getName(jobPath); + String newDir = INTERRUPTED_MOVE.get(jobDir); + String newJobPath = newDir + "/" + jobName; + + try { + log.info("moving " + jobPath + " to " + newJobPath); + fileSystem.move(jobPath, newJobPath); + } catch (DisworkFileSystemException e) { + log.error("error while moving job link", e); + throw new DisworkSystemException("error while moving job link", e); } } - + + /** update the log the job + * permit the job submitter to use + * @link {@link DisworkDaemon#isStarted(JobDescription)} + */ + protected void jobIsStarted(String jobPath) throws DisworkSystemException { + log(jobPath, "STARTED"); + } + /** * find */ @Override public void run() { - while (! shouldStop) { - try { + try { + while (! shouldStop) { synchronized (sem) { if (getFlag()) { - findAJobAndRunIt(); + currentJobPath = findAJob(); + if (currentJobPath != null) { + runJob(); + currentJobPath = null; + } } else { log.debug("sleeping until a change on flag"); long waitTime = activityStrategy.timeBeforeNextUpdate(); - if (waitTime == -1) { - sem.wait(); - } else { - sem.wait(waitTime); + try { + if (waitTime == -1) { + // wait until notify + sem.wait(); + } else { + sem.wait(waitTime); + } + } catch (InterruptedException e) { + log.error("interrupted while waiting for a change of activity", e); + throw new DisworkSystemException("interrupted while waiting for a change of activity", e); } } } - } catch (DisworkException e) { - log.error("worker error " + e); - throw new RuntimeException("worker error " + e); - } catch (IOException e) { - log.error("worker error " + e); - throw new RuntimeException("worker error " + e); - } catch (DisworkFileSystemException e) { - log.error("worker error " + e); - throw new RuntimeException("worker error " + e); - } catch (InterruptedException e) { - log.error("worker error " + e); - throw new RuntimeException("worker error " + e); } + } catch (DisworkException e) { + throw new RuntimeException("an error occured", e); } } + + @Override + public String toString() { + return getName(); + } } public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) @@ -582,11 +769,12 @@ } } - /** read an application from the file system and use a cache */ + /** read an application from the file system and use a cache + * @throws DisworkSystemException + * @throws LocalFileException */ protected File getApplicationData(String applicationName, String applicationVersion) - throws DisworkFileSystemException, - IOException { + throws DisworkSystemException, LocalFileException { File cachedApplicationData = new File(applicationCache, applicationName + "-" + applicationVersion + ".zip"); if (!cachedApplicationData.exists()) { @@ -594,12 +782,30 @@ synchronized (cachedApplicationData) { String applicationPath = DisworkDaemon.getPathForDependency( applicationName, applicationVersion); - InputStream applicationData = fileSystem.read(applicationPath); - cachedApplicationData.createNewFile(); - log.info("will create " + cachedApplicationData.getAbsolutePath()); - OutputStream out = new FileOutputStream(cachedApplicationData); - log.debug("starting copy of " + applicationData.available() + " bytes"); - IOUtils.copy(applicationData, out); + InputStream applicationData = null; + try { + applicationData = fileSystem.read(applicationPath); + } catch (DisworkFileSystemException e) { + log.error("unable to get application", e); + throw new DisworkSystemException("unable to get application", e); + } finally { + IOUtils.closeQuietly(applicationData); + } + + OutputStream out = null; + try { + cachedApplicationData.createNewFile(); + log.info("will create " + cachedApplicationData.getAbsolutePath()); + out = new FileOutputStream(cachedApplicationData); + log.debug("starting copy of " + applicationData.available() + " bytes"); + IOUtils.copy(applicationData, out); + } catch (IOException e) { + log.error("unable to write application in cache", e); + throw new LocalFileException("unable to write application in cache", e); + } finally { + IOUtils.closeQuietly(applicationData); + IOUtils.closeQuietly(out); + } } } else { log.debug("cache matches for " + applicationName + "-" + applicationVersion); @@ -608,12 +814,9 @@ } public void stop() throws DisworkException { - stop(false); - } - - public void stop(boolean now) throws DisworkException { // asking to all threads to stop for (Worker worker : workers) { + log.debug("asking " + worker + " to stop"); worker.shouldStop = true; } @@ -621,16 +824,17 @@ FileUtil.deleteRecursively(applicationCache); - if( !now ) { - // waiting for them to actually have finished - for (Worker worker : workers) { - while (worker.isAlive()) { - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - log.warn("interrupted while waiting for a worker to " + - "stop", e); - } + // waiting for them to actually have finished + for (Worker worker : workers) { + while (worker.isAlive()) { + try { + // worker may be sleeping + activeNoActivityStrategy(); + log.debug("waiting for " + worker + " to return"); + Thread.sleep(1000); + } catch (InterruptedException e) { + log.warn("interrupted while waiting for a worker to " + + "stop", e); } } } @@ -674,4 +878,13 @@ public void activeScheduledActivityStrategy() throws DisworkException { setActivityStrategy(ActivityStrategies.SCHEDULED); } + + /** this is only for monitoring purpose */ + public List<JobDescription> getAllWorkersCurrentJobs() { + List<JobDescription> result = new ArrayList<JobDescription>(); + for (Worker worker : workers) { + result.add(worker.currentJob); + } + return result; + } } \ No newline at end of file Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-09 11:41:12 UTC (rev 101) @@ -5,7 +5,7 @@ @Override protected void setConfigs() { - config = super.newConfig(); + config = newConfig(); config.setNumberOfWorkers(32); } Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-09 11:41:12 UTC (rev 101) @@ -7,10 +7,10 @@ @Override protected void setConfigs() { - config = super.newConfig(); + config = newConfig(); config.setActivityStrategy("none"); - config2 = super.newConfig(); + config2 = newConfig(); config2.setBootstrapIp(DisworkFileSystemConfig.getIp()); config2.setBootstrapPort(config.getUsedPort()); config2.setActivityStrategy("unlimited"); Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-09 11:41:12 UTC (rev 101) @@ -17,10 +17,10 @@ protected DisworkConfig config; protected DisworkConfig config2; - - protected static DisworkDaemon daemon; - protected static DisworkDaemon daemon2; - + + protected DisworkDaemon daemon; + protected DisworkDaemon daemon2; + protected static int port = 45500; /** a factory method to ease the creation of configs */ @@ -28,6 +28,7 @@ DisworkConfig config = new DisworkConfig(); port += 1; config.setUsedPort(port); + config.setJobLooksWaitTime(1); // useless in tests config.setStartHttpFrontend(false); @@ -66,7 +67,6 @@ JobDescription job = new JobDescription(); job.setCommandLine("java -version"); daemon.submitJob(job); - } @Test(expected = DisworkException.class) @@ -87,6 +87,7 @@ Thread.sleep(5 * 1000); } + assertTrue(daemon.isStarted(job)); assertTrue(daemon.isSuccessful(job)); // check getAllJobs return @@ -193,4 +194,25 @@ public void testJobsManagement() throws Exception { assertEquals(0, daemon.getAllJobs().size()); } + + @Test + public void testJobError() throws Exception { + JobDescription job = new JobDescription(); + job.setJobName("My Job with wrong command line"); + job.setApplication("fake-app", "1.0"); + job.setCommandLine("%java -jar no_such_jar.jar"); + job.setStandardOutput("output.txt"); + job.addOutput("output.txt"); + + daemon.submitJob(job); + + while(! daemon.isFinished(job)) { + Thread.sleep(5 * 1000); + } + + assertTrue(daemon.isFailed(job)); + assertTrue(daemon.getResults(job).containsKey("output.txt")); + String output = IOUtils.toString(daemon.getResults(job).get("output.txt")); + assertTrue(output.contains("Unable to access jarfile no_such_jar.jar")); + } } Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-09 11:41:12 UTC (rev 101) @@ -5,7 +5,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; import java.net.URL; import org.apache.commons.collections.ListUtils; @@ -53,8 +52,13 @@ @Test public void testParseJSDL1() throws Exception { - try { - JobDescription jobCopy = JobDescription.parseJSDL(job.toJSDL()); + JobDescription jobCopy; + try { + jobCopy = JobDescription.parseJSDL(job.toJSDL()); + } catch (BadJobException e) { + fail(); + throw e; + } assertNotNull(jobCopy); assertEquals(job.getJobName(), jobCopy.getJobName()); assertEquals(job.getApplicationName(), @@ -64,35 +68,31 @@ assertEquals(job.getCommandLine(), jobCopy.getCommandLine()); assertEquals(job.getStandardInput(), jobCopy.getStandardInput()); assertEquals(job.getStandardOutput(), jobCopy.getStandardOutput()); - } catch (IOException e) { - fail(); - throw e; - } } @Test public void testParseJSDL2() throws Exception { + JobDescription job2Copy; try { - JobDescription job2Copy = JobDescription.parseJSDL(job2.toJSDL()); - assertEquals(job2.getCommandLine(), job2Copy.getCommandLine()); - assertEquals(job2.getStandardInput(), job2Copy.getStandardInput()); - assertEquals(job2.getStandardOutput(), - job2Copy.getStandardOutput()); - - assertTrue(ListUtils.isEqualList(job2.getInput(), - job2Copy.getInput())); - assertTrue(ListUtils.isEqualList(job2.getOutput(), - job2Copy.getOutput())); - assertTrue(ListUtils.isEqualList( - job2.getInputUrls().keySet(), - job2Copy.getInputUrls().keySet())); - assertTrue(ListUtils.isEqualList( - job2.getInputUrls().values(), - job2Copy.getInputUrls().values())); - } catch (IOException e) { + job2Copy = JobDescription.parseJSDL(job2.toJSDL()); + } catch (BadJobException e) { fail(); throw e; } + assertEquals(job2.getCommandLine(), job2Copy.getCommandLine()); + assertEquals(job2.getStandardInput(), job2Copy.getStandardInput()); + assertEquals(job2.getStandardOutput(), + job2Copy.getStandardOutput()); + + assertTrue(ListUtils.isEqualList(job2.getInput(), + job2Copy.getInput())); + assertTrue(ListUtils.isEqualList(job2.getOutput(), + job2Copy.getOutput())); + assertTrue(ListUtils.isEqualList( + job2.getInputUrls().keySet(), + job2Copy.getInputUrls().keySet())); + assertTrue(ListUtils.isEqualList( + job2.getInputUrls().values(), + job2Copy.getInputUrls().values())); } - } Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-07 16:23:54 UTC (rev 100) +++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-09 11:41:12 UTC (rev 101) @@ -63,7 +63,7 @@ private static final Log log = LogFactory.getLog(DisworkFileSystemConfig.class); - protected static Integer port = 19000; + protected static Integer port = 18999; /** * returns a new port, returned value change at each call.