r82 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
Author: bleny Date: 2010-06-17 15:33:48 +0200 (Thu, 17 Jun 2010) New Revision: 82 Url: http://nuiton.org/repositories/revision/diswork/82 Log: bugfix concurrence Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 09:00:26 UTC (rev 81) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-17 13:33:48 UTC (rev 82) @@ -72,9 +72,9 @@ RUNNING_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_1_RUNNING); RUNNING_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_2_RUNNING); - FAILED_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1); - FAILED_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2); - FAILED_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3); + FAILED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.FAILED_1); + FAILED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_2); + FAILED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_3); } private static final Log log = LogFactory.getLog(WorkersManager.class); @@ -89,23 +89,34 @@ protected class Worker extends Thread { - public boolean shouldStop = false; + protected boolean shouldStop = false; - public WorkersManager manager; + protected WorkersManager manager; + /** + * this method add a line to a job-specific log + * @param jobPath the path to the job concerned + * @param message the line to add the the log + * @throws IOException if an error occurred while writing the log + */ protected void log(String jobPath, String message) throws IOException { String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH; InputStream oldLogAsStream = fileSystem.read(logPath); String oldLog = IOUtils.toString(oldLogAsStream); - String logEntry = message + "\n"; - String newLog = oldLog + logEntry; - - fileSystem.delete(logPath); fileSystem.write(logPath, IOUtils.toInputStream(newLog)); } + /** + * Download all the files needed for a job in a temp directory, run + * the job, wait for it to end, write all the results. Mark the job + * as running at the beginning and move it to DONE or FAILED at + * the end, depending of the results + * @param jobPath + * @return + * @throws IOException + */ protected boolean runJob(String jobPath) throws IOException { log.info("running job at " + jobPath); @@ -168,13 +179,13 @@ IOUtils.copy(source, new FileOutputStream(localCopy)); } - // executing the job + // prepare and run it the job String commandLine = jobDescription.getCommandLine(); - log.info("calling " + commandLine); String[] commandLineElements = commandLine.split(" "); ProcessBuilder builder = new ProcessBuilder(commandLineElements); builder.directory(jobDir); builder.redirectErrorStream(true); + log.info("calling " + commandLine); Process job = builder.start(); // plugging a file on the standard input @@ -185,6 +196,7 @@ IOUtils.copy(input, job.getOutputStream()); } + // run the process and wait for it to return int exitValue = -1; try { exitValue = job.waitFor(); @@ -237,7 +249,7 @@ return success; } - public String getFistJobName(String path) throws IOException { + protected String getFistJobName(String path) throws IOException { List<String> jobsNames = fileSystem.readDirectory(path); if (jobsNames.size() == 0) { return null; @@ -250,71 +262,93 @@ protected void findAJobAndRunIt() { // try to find a new job try { + + // Once a job is found, those two var will be set String jobLinkDir = null; String jobLinkName = null; - String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, - DisworkDaemon.FAILED_1_RUNNING, - DisworkDaemon.TODO_RUNNING - }; - - // FIXME 20100617 bleny stop loops when found - for (String path : runningJobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { - Long linkAge = System.currentTimeMillis() - - Long.parseLong(oldName); - if (linkAge > MAX_JOB_RUNNING_TIME) { - log.info("taking old job (age = " + linkAge + ")"); - jobLinkDir = path; - jobLinkName = oldName; + // use a synchronized block because multiple workers + // may try to take a same job + synchronized (fileSystem) { + + // fist, try to find a job declared has running since + // too long to re-run it + String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, + DisworkDaemon.FAILED_1_RUNNING, + DisworkDaemon.TODO_RUNNING + }; + // browsing all "running" dirs + for (String path : runningJobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { + Long linkAge = System.currentTimeMillis() + - Long.parseLong(oldName); + // check is oldest job is too old and should be + // considered has to-be-rerun + if (linkAge > MAX_JOB_RUNNING_TIME) { + log.info("taking old job (age = " + linkAge + ")"); + jobLinkDir = path; + jobLinkName = oldName; + // FIXME 20100617 bleny break s*cks + break; + } } } - } - - if (jobLinkDir == null) { - String[] jobsDirs = { DisworkDaemon.FAILED_2, - DisworkDaemon.FAILED_1, - DisworkDaemon.TODO - }; - for (String path : jobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { // take it - jobLinkDir = path; - jobLinkName = oldName; + + // if no job was found, search now in not running jobs + if (jobLinkDir == null) { + String[] jobsDirs = { DisworkDaemon.FAILED_2, + DisworkDaemon.FAILED_1, + DisworkDaemon.TODO + }; + for (String path : jobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { // take it + jobLinkDir = path; + jobLinkName = oldName; + // FIXME 20100617 bleny break s*cks + break; + } } } + + if (jobLinkDir != null) { + // move the link before running the job + String oldPath = jobLinkDir + "/" + jobLinkName; + log.info("job found at " + oldPath); + + jobLinkDir = RUNNING_MOVE.get(jobLinkDir); + jobLinkName = DisworkDaemon.newJobLinkName(); + String newPath = jobLinkDir + "/" + jobLinkName; + + log.info("moving " + oldPath + " to " + newPath); + fileSystem.move(oldPath, newPath); + + } } + // now, if no job was found, do nothing if (jobLinkDir == null) { log.info("nothing to do"); Thread.sleep(JOB_WAIT); + // if a job was found, take it } else { - // move the link before running the job - String oldPath = jobLinkDir + "/" + jobLinkName; - log.info("job found at " + oldPath); + + String jobPath = jobLinkDir + "/" + jobLinkName; - String newPath = RUNNING_MOVE.get(jobLinkDir) + "/" + - DisworkDaemon.newJobLinkName(); + boolean jobSuccess = runJob(jobPath); - log.info("moving " + oldPath + " to " + newPath); - fileSystem.move(oldPath, newPath); - - // run the job - boolean jobSuccess = runJob(newPath); - // move the link after the job - oldPath = newPath; String newDir = null; if (jobSuccess) { newDir = DisworkDaemon.DONE; } else { newDir = FAILED_MOVE.get(jobLinkDir); } - newPath = newDir + "/" + DisworkDaemon.newJobLinkName(); + String newPath = newDir + "/" + DisworkDaemon.newJobLinkName(); - log.info("moving " + oldPath + " to " + newPath); - fileSystem.move(oldPath, newPath); + log.info("moving " + jobPath + " to " + newPath); + fileSystem.move(jobPath, newPath); // mark the job has finished if done or failed too many // times @@ -332,8 +366,7 @@ } catch (InterruptedException e) { log.info("exception catch", e); // TODO 20100611 bleny manage exception - } - + } } @Override @@ -367,7 +400,8 @@ } else if ("scheduled".equals(initialStrategy)) { activeScheduledActivityStrategy(); } else { - log.error("wrong config directive " + initialStrategy); + log.error("wrong config directive for initial strategy" + + initialStrategy); activeNoActivityStrategy(); }
participants (1)
-
bleny@users.nuiton.org