r85 - in trunk: . diswork-daemon/src/main/java/org/nuiton/diswork/daemon diswork-daemon/src/test/java/org/nuiton/diswork/daemon diswork-fs/src/main/java/org/nuiton/diswork/fs diswork-fs/src/main/java/org/nuiton/diswork/fs/storage diswork-fs/src/test/java/org/nuiton/diswork/fs
Author: bleny Date: 2010-06-21 12:14:34 +0200 (Mon, 21 Jun 2010) New Revision: 85 Url: http://nuiton.org/repositories/revision/diswork/85 Log: documentation, client simple, lanceur de d?\195?\169mon Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java trunk/pom.xml Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-21 10:14:34 UTC (rev 85) @@ -30,6 +30,34 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +/** + * This interface introduce the concept of "activity strategy" in + * diswork, an activity strategy answer to "Can i run a job now ?". + * + * This provide a simple method that returns true if Diswork can run a job + * and take hardware resources. If false is returned, it means that resources + * are not available or we want diswork not to consume more resources. + * + * This interface come with 4 implementations. Two first are very simple, + * one is used when diswork is "shut down", it always returns false. The + * second is used when diswork is in "free-wheel" and take as much resources + * as jobs need (always return true). + * + * LimitedActivity implementation is user-activity aware and returns true + * only if it seems that the user incompletely use the resources of + * his computer. This implementation is based on the a load-average + * computation of the computer. If load-average is low, computer is + * idling : it returns true and diswork will use remaining resources. + * If load-average is high, false is returned and diswork don't make + * the computer over-loaded. + * + * ScheduledActivity implementation make diswork run jobs only at + * fixed period of time. An user can configure it to make + * diswork run job only at night are during the week-end, for example. + * + * @author bleny + * + */ public interface ActivityStrategy { /** use this strategy to never run a job */ @@ -48,7 +76,7 @@ } } - /** use this strategy to run a job only if load average is low */ + /** use this strategy to run a job only if computer is idling */ public static class LimitedActivity implements ActivityStrategy { private static final Log log = LogFactory.getLog(LimitedActivity.class); Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-21 10:14:34 UTC (rev 85) @@ -31,6 +31,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; @@ -46,9 +47,88 @@ import org.nuiton.util.FileUtil; /** + * The diswork daemon is the gateway to the global diswork system. Instantiate + * this class creates a new node on the system. The new node can be used + * to submit jobs and monitor them. * + * For how-to use considerations see ... + * + * Here is a list of what the daemon do when booting : + * <ul> + * <li>join the network, given some information (IP+port) provided + * with diswork-config (a diswork-config is an instance of + * {@link DisworkConfig})</li> + * <li>define what is the ownedId for this node. It has to be + * recovered from the last run of the daemon. It's + * the link between two different sessions : if you submit a job, stop + * and restart the daemon with the same ownerId, you will find your + * job back. If ownerId can't be recovered or if it's the first + * run, an ownerId is generated and saved for future run.</li> + * <li>define the home-directory, it's deduced from the ownerId. Thus, if you + * change you ownerId, you'll change your home-dir and loose your jobs + * because they are all stored in your home-dir. If it's the first + * time the daemon is run by this ownerId, the homeDir is created</li> + * <li>If diswork is configured for, the daemon will start a workers-manager. + * This manager don't care about the jobs you submit but look + * for jobs that are proposed on the global system and try to + * help. For more details, see {@link WorkersManager}</li> + * <li>Write some stats about itself. Since all nodes do that, each nodes + * can performs statistics computation to estimate the powerfulness of + * the global diswork system. Each node write those info in a file + * in his home-dir at {@link #HARDINFO_PATH}</li> + * </ul> + * + * The main use of the daemon is to submit job to the global diswork + * system. + * + * A job is a task that need an application. An application is a simple file + * that should be provided to the daemon once for all + * (using {@link #submitApplication(String, String, InputStream)}. Then, all + * nodes that would like to perform this job can download the application. + * + * You can create your application ready for diswork, submit it to the daemon + * and then submit as much jobs as you want that depends of this application. + * Think an application as the common stuff (programs, scripts, files etc.) + * all the different job will need. An application is just a set of file. + * + * TODO 20100617 bleny explain it better + * + * Once the application has been made available, jobs can be created and + * submitted. To do so, a programmer must write a class that + * + * <ol> + * <li>may republish the application (it's always good to be sure an + * application is still present if the last time that it has + * been used is too long)</li> + * <li>Create a new {@link JobDescription} and set it right for his + * new job.</li> + * <li>Submit the job by calling {@link #submitJob(JobDescription)}</li> + * <li>Monitor the job with {@link #isFinished(JobDescription)}</li> + * <li>Check the job state with {@link #isFailed(JobDescription)} + * and {@link #isSuccessful(JobDescription)} and get the results with + * {@link #getResults(JobDescription)}</li> + * <li>Finally, remove the job</li> + * </ol> + * + * Jobs will be saved in home-dir, in a path generated with + * {@link #getPathForJob(JobDescription)}. In this directory will be placed + * <ul> + * <li>All the data provided with the job {@link JobDescription#inputData}</li> + * <li>The JSDL file that describe the job (placed according to the + * value of {@link #JSDL_PATH})</li> + * <li>A log file placed at {@link #LOG_PATH}, it will be empty at the + * submission but lines will be added each time a node try to do + * this job. If the job is finished, the log will contain "FINISHED", + * if it has been completed, it will contain "DONE", if it has been + * failed, it will contain "FAILED" one or few times.</li> + * </ul> + * + * The daemon should be closed properly by using the {@link #close()} method. + * It will release resources, update stats and stop the worker-manager. + * * As far as possible, the use of the file system follow the UNIX Filesystem - * Hierarchy Standard + * Hierarchy Standard. All usual paths used by Diswork are defined in + * constants. * * @see http://en.wikipedia.org/wiki/Filesystem_Hierarchy_Standard * @@ -87,6 +167,9 @@ /** a place where are all user-directories */ protected static final String HOME = "/home"; + + /** in home, where jobs should be placed */ + protected static final String JOBS_DIR = "jobs"; /** in a job directory, the place where the JSDL must be placed */ protected static final String JSDL_PATH = ".diswork/job.jsdl"; @@ -259,7 +342,7 @@ */ protected String getPathForJob(String jobId) { // all jobs are stored in home dir - return homeDir + "/" + jobId; + return homeDir + "/" + JOBS_DIR + "/" + jobId; } /** @@ -273,7 +356,7 @@ /** * Provide an application to all nodes. Once provided, all nodes will be - * able to perform a job with this application + * able to perform a job with this application. * * An application can be uploaded in different version. If a given * application in the given version is already on the File System, nothing @@ -285,7 +368,7 @@ * @throws DisworkException if an error occurs while uploading the file */ public void submitApplication(String applicationName, - String applicationVersion, InputStream applicationData) + String applicationVersion, InputStream applicationData) throws DisworkException { // the place where dependency will be stored @@ -310,6 +393,39 @@ } } + /** + * Returns a list with all jobs submitted before. + * @return + * @throws DisworkException + */ + public List<JobDescription> getAllJobs() throws DisworkException { + List<JobDescription> result = new ArrayList<JobDescription>(); + try { + List<String> jobs = fileSystem.readDirectory(homeDir + "/" + JOBS_DIR); + for (String jobId : jobs) { + String jsdl = IOUtils.toString(fileSystem.read( + getPathForJob(jobId) + "/" + JSDL_PATH)); + JobDescription jobDescription = JobDescription.parseJSDL(jsdl); + jobDescription.setJobId(jobId); + result.add(jobDescription); + } + } catch (IOException e) { + log.info("error file reading home-dir", e); + throw new DisworkException("error file reading home-dir", e); + } + return result; + } + + /** + * Cancel the submission of a job + * @param jobDescription + * @throws DisworkException + */ + public void deleteJob(JobDescription jobDescription) { + // TODO 20100618 bleny stub + throw new UnsupportedOperationException(); + } + public void submitJob(JobDescription jobDescription) throws DisworkException { if (jobDescription.getInputData().size() + jobDescription.getStagingInputUrls().size() @@ -413,6 +529,14 @@ return isFinished(job) && !isSuccessful(job); } + /** + * Must not be called until {@link #isFinished(JobDescription)} returns + * true for this job. + * @param job + * @return a map associating the file-name of a result to a stream to + * read the file + * @throws DisworkException if job is not finished + */ public Map<String, InputStream> getResults(JobDescription job) throws DisworkException { if (isFinished(job)) { Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-21 10:14:34 UTC (rev 85) @@ -42,10 +42,9 @@ public static void main(String[] args) throws Exception { DisworkConfig config = new DisworkConfig(); - if (args.length == 0) { config.setFileSystemConfig( DisworkFileSystemConfig.newKademliaDisworkConfig()); - } else if (args.length == 2){ + if (args.length == 2) { config.setFileSystemConfig( DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], Integer.parseInt(args[1]))); Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-06-21 10:14:34 UTC (rev 85) @@ -25,6 +25,10 @@ package org.nuiton.diswork.daemon; import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStreamReader; import java.util.Map; @@ -32,36 +36,25 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.nuiton.diswork.fs.DisworkFileSystemConfig; +import org.nuiton.util.FileUtil; /** * * @author bleny */ public class DisworkSimpleClient { - + private static final Log log = LogFactory.getLog(DisworkSimpleClient.class); + + static DisworkDaemon daemon; - /** - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { + public static void userPrompt() throws Exception { - DisworkConfig config = new DisworkConfig(); - config.setFileSystemConfig( - DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], - Integer.parseInt(args[1]))); - config.setActivityStrategy("none"); - config.setUsedPort(30000); - DisworkDaemon daemon = new DisworkDaemon(config); - - // prompt the user to enter their name - // open up standard input BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String userEntry = ""; - do { + while (!userEntry.equals("quit")) { System.out.print("diswork: "); userEntry = br.readLine(); @@ -83,8 +76,57 @@ System.out.println(IOUtils.toString(daemon.getResults(job).get("output.txt"))); } - - } while (!userEntry.equals("quit")); + } + } + + public static void isisSubmit() throws Exception { + File isis = new File("/tmp/isis/isis-fish-3.3.0.3-bin.zip"); + daemon.submitApplication("isis-fish", "3.3.0.3", new FileInputStream(isis)); + + JobDescription jobDescription = new JobDescription(); + + jobDescription.setJobName("Mon Job Isis"); + + jobDescription.setApplication("isis-fish", "3.3.0.3"); + + File input = new File("/tmp/isis/sim_test-gdg-3.2-3.3.zip"); + jobDescription.addInput("sim_test-gdg-3.2-3.3.zip", new FileInputStream(input)); + jobDescription.addOutput("sim_test-gdg-3.2-3.3-result.zip"); + + jobDescription.setCommandLine("%java -Xmx2500M -jar isis-fish*.jar" + + " --option launch.ui false --option perform.vcsupdate false" + + " --option perform.migration false --option perform.cron false" + + " --simulateRemotelly my_isis_job sim_test-gdg-3.2-3.3.zip" + + " sim_test-gdg-3.2-3.3-result.zip"); + + daemon.submitJob(jobDescription); + + while (! daemon.isFinished(jobDescription)) { + Thread.sleep(5 * 1000); + System.out.print("."); + } + + File result = new File("/tmp/isis/sim_test-gdg-3.2-3.3-result.zip"); + + IOUtils.copy(daemon.getResults(jobDescription).get("sim_test-gdg-3.2-3.3-result.zip"), new FileOutputStream(result)); + + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + DisworkConfig config = new DisworkConfig(); + Integer port = Integer.parseInt(args[1]); + System.out.println("port = " + port); + config.setFileSystemConfig( + DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], port)); + config.setActivityStrategy("none"); + config.setUsedPort(30000); + daemon = new DisworkDaemon(config); + //userPrompt(); + isisSubmit(); } } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-21 10:14:34 UTC (rev 85) @@ -52,7 +52,7 @@ * * <dl> * <dt>%java</dt> - * </dd>will be replaced by the actual path of java executable + * <dd>will be replaced by the actual path of java executable * into the JRE</dd> * </dl> * @@ -117,8 +117,8 @@ /** * constructor is protected to prevent bad jobIds. To get a JobDescription - * instance, a client should use the {@link DisworkDaemon#newJob()} - * factory method. The given instance will have a valid jobId + * instance, a client should use the {@link #JobDescription()} constructor + * The given instance will have a valid jobId when needed. * @param jobId */ protected JobDescription(String jobId) { @@ -194,16 +194,7 @@ return "job : " + jobName + " (" + jobId + ")"; } - protected static Integer count = 0; - protected static Map<String, JobDescription> map = new HashMap<String, JobDescription>(); - - public String toJSDL() { - /* - count += 1; - map.put(count.toString(), this); - return count.toString(); - */ - + public String toJSDL() { String jsdl = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + "<jsdl:JobDefinition xmlns=\"http://www.example.org/\"\n" @@ -288,7 +279,7 @@ Element jobName = jobIdentification.getChild("JobName", jsdlNamespace); result.setJobName(jobName.getText()); - + // application Element application = jobDescription.getChild("Application", jsdlNamespace); @@ -300,10 +291,10 @@ result.setApplication(applicationName.getText(), applicationVersion.getText()); } - + Element POSIXApplication = application.getChild("POSIXApplication", jsdlPosixNamespace); - + Element argument = POSIXApplication.getChild("Argument", jsdlPosixNamespace); result.setCommandLine(argument.getText()); @@ -318,22 +309,16 @@ if (output != null) { result.setStandardOutput(output.getText()); } - + // staging List<Element> dataStagings = jobDescription.getChildren ("DataStaging", jsdlNamespace); for (Element dataStaging : dataStagings) { Attribute type = dataStaging.getAttribute("type", disworkNamespace); - Element fileName = dataStaging.getChild("FileName", jsdlNamespace); - - if (type != null && "out".equals(type.getValue())) { - // type="out" - result.addOutput(fileName.getText()); - } else { - // type="in" + if ("in".equals(type.getValue())) { Element source = dataStaging.getChild("Source", jsdlNamespace); if (source != null) { @@ -342,14 +327,11 @@ } else { result.addInput(fileName.getText(), new NullInputStream(0)); } - - // type not set - if (type == null) { - result.addOutput(fileName.getText()); - } } + if ("out".equals(type.getValue())) { + result.addOutput(fileName.getText()); + } } - } catch (JDOMException e) { log.error("can't read malformed JSDL file", e); throw new IOException("can't read malformed JSDL file", e); @@ -359,7 +341,7 @@ } return result; } - + public void addInput(String fileName, URL source) { input.add(fileName); inputUrls.put(fileName, source); @@ -369,7 +351,7 @@ input.add(fileName); inputData.put(fileName, source); } - + public void addOutput(String fileName) { output.add(fileName); } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-21 10:14:34 UTC (rev 85) @@ -37,7 +37,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; @@ -48,23 +47,37 @@ import org.nuiton.util.ZipUtil; /** + * The workers-manager aims to run and manage the different workers. A worker + * is a thread that try to find a jobs and execute them. * + * The manager can manage multiple workers : one worker + * doing one job at a time. By running n workers, the daemon can run up + * to n job at the same time. The number of workers should be set carefully + * (according to the actual hardware resources available). The number of workers + * can be set through {@link DisworkConfig#setNumberOfWorkers(Integer)}. + * + * Workers are not always active, before trying to find and job, they check + * if they will have enough resources to execute the job without bugging + * the other processes of the host machine. To do so, the worker-manager + * allow to define an activity-strategy to be followed by each worker. The + * activity strategy tell the worker if they are allowed to take resources or + * not, different strategies are available (see {@link ActivityStrategy}), + * current strategy can be changed on the fly, while the daemon run. + * * @author bleny */ public class WorkersManager { + private static final Log log = LogFactory.getLog(WorkersManager.class); + + /** a job found in dir "key", should be move to "value" before running */ protected static Map<String, String> RUNNING_MOVE = new HashMap<String, String>(); + + /** a job found in dir "key" that fail should be moved to "value" */ protected static Map<String, String> FAILED_MOVE = new HashMap<String, String>(); - - // TODO 20100611 bleny make it configurable - /** time to wait beetween two look for a job */ - protected static final int JOB_WAIT = 10 * 1000; - // TODO 20100614 bleny make it configurable - /** after this time (ms), a job is considered as no longer running */ - protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000; - static { + // initialize RUNNING_MOVE and FAILED_MOVE constants RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING); RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_1_RUNNING); RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_2_RUNNING); @@ -76,23 +89,44 @@ FAILED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_2); FAILED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_3); } - - private static final Log log = LogFactory.getLog(WorkersManager.class); protected DisworkFileSystem fileSystem; protected DisworkConfig config; - // Pool of workers + /** a pool of workers */ protected List<Worker> workers = new ArrayList<Worker>(); + /** the current activity strategy followed by all the workers */ protected ActivityStrategy activityStrategy; + /** A worker search a job and execute it. + * + * Jobs are found on the file-system by browsing some special directories, + * like {@link DisworkDaemon#TODO}. They do not contains jobs but symbolic + * links to jobs. If a worker find a job, it must move it to another + * directory ({@link DisworkDaemon#TODO_RUNNING}) for no other worker to + * take it. Once finished, the job link, if successful, will be moved to + * {@link DisworkDaemon#DONE}. If failed, the link will be moved to + * {@link DisworkDaemon#FAILED_1} (read "failed once") for another worker + * to try it and move it the same may. If the job is failed three times, + * it's moved to {@link DisworkDaemon#FAILED_3} meaning it will not been + * tried again. + * + * @author bleny + */ protected class Worker extends Thread { - + + // TODO 20100611 bleny make it configurable + /** time to wait between two look for a job */ + protected static final int JOB_WAIT = 10 * 1000; + + // TODO 20100614 bleny make it configurable + /** after this time (ms), a job is considered as no longer running */ + protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000; + + /** set this field to true will make run() return and thread stop */ protected boolean shouldStop = false; - - protected WorkersManager manager; - + /** * this method add a line to a job-specific log * @param jobPath the path to the job concerned @@ -135,10 +169,9 @@ log.info("will run job " + jobDescription); - // create temp dir - Random random = new Random(); - File jobDir = new File(config.getTempDirectory(), - String.valueOf(random.nextInt())); + // create temp dir + File jobDir = FileUtil.createTempDirectory("job", "", + new File(config.getTempDirectory())); jobDir.mkdirs(); // download application @@ -249,6 +282,10 @@ return success; } + /** + * In a directory, list the content, sort the content, and returns + * the first element. + */ protected String getFistJobName(String path) throws IOException { List<String> jobsNames = fileSystem.readDirectory(path); if (jobsNames.size() == 0) { @@ -369,19 +406,15 @@ } } + /** + * find + */ @Override public void run() { while (! shouldStop) { - if (manager.getActivityStrategy().canWork()) { + if (activityStrategy.canWork()) { findAJobAndRunIt(); } - try { - Thread.sleep(10*1000); - } catch (InterruptedException e) { - // TODO 20100615 bleny Auto-generated catch block - log.info("exception catch", e); - e.printStackTrace(); - } } } } @@ -389,7 +422,8 @@ public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) { this.fileSystem = fileSystem; this.config = config; - + + // initialize activityStrategy according to config String initialStrategy = config.getActivityStrategy(); if ( "none".equals(initialStrategy)) { activeNoActivityStrategy(); @@ -405,14 +439,13 @@ activeNoActivityStrategy(); } + // start as many workers as needed log.info("will start " + config.getNumberOfWorkers() + " workers"); for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) { Worker worker = new Worker(); - worker.manager = this; worker.start(); workers.add(worker); } - } public void stop() { Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-21 10:14:34 UTC (rev 85) @@ -5,6 +5,7 @@ import java.io.InputStream; import java.net.URL; +import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; @@ -39,6 +40,7 @@ JobDescription job = new JobDescription(); job.setCommandLine("java -version"); daemon.submitJob(job); + } @Test(expected = DisworkException.class) @@ -60,6 +62,14 @@ } assertTrue(daemon.isSuccessful(job)); + + // check getAllJobs return + List<JobDescription> currentJobs = daemon.getAllJobs(); + assertEquals(1, currentJobs.size()); + + JobDescription currentJob = currentJobs.get(0); + assertEquals(job.getJobId(), currentJob.getJobId()); + assertEquals(job.getCommandLine(), currentJob.getCommandLine()); } Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-06-21 10:14:34 UTC (rev 85) @@ -45,11 +45,11 @@ * </dd> * </dl> * - * * This class provides utilities to write tests easily. * * You can get multiples diswork configs ready to use. All the instances uses * a new port and the local IP. + * * <pre> * c = newDisworkConfig(); // create a config for a bootstrap node * c2 = newDisworkConfig(c.getUsedPort()) // creates a config for a node that @@ -57,7 +57,6 @@ * // first node * c3 = newDisworkConfig(c.getUsedPort()) * </pre> - * */ public class DisworkFileSystemConfig extends ApplicationConfig { @@ -103,7 +102,8 @@ } public DisworkFileSystemConfig() { - setDefaultOption("diswork.fs.blocks_size", "10485760"); // 10 MiB + // setDefaultOption("diswork.fs.blocks_size", "10485760"); // 10 MiB + setDefaultOption("diswork.fs.blocks_size", "50000"); setDefaultOption("diswork.fs.map_type", "inmemory"); setDefaultOption("diswork.fs.use_port", port.toString()); Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-06-21 10:14:34 UTC (rev 85) @@ -152,16 +152,16 @@ protected DisworkMap map; - protected DisworkFileSystemConfig disworkConfig; + protected DisworkFileSystemConfig config; - public Storage(DisworkFileSystemConfig disworkConfig, DisworkMap map) + public Storage(DisworkFileSystemConfig config, DisworkMap map) throws IOException { - this.disworkConfig = disworkConfig; + this.config = config; if (map == null) { // instantiating a map according to config directives - String mapType = disworkConfig.getOption("diswork.fs.map_type"); + String mapType = config.getOption("diswork.fs.map_type"); if (mapType == null) { log.info("no map type specified"); @@ -172,10 +172,10 @@ this.map = new InMemoryDisworkMap(); } else if ("pastry".equals(mapType)) { log.info("using Pastry map"); - this.map = new PastryDisworkMap(disworkConfig); + this.map = new PastryDisworkMap(config); } else if ("kademlia".equals(mapType)) { log.info("using Kademlia map"); - this.map = new KademliaDisworkMap(disworkConfig); + this.map = new KademliaDisworkMap(config); } } } else { @@ -191,8 +191,8 @@ } } - public Storage(DisworkFileSystemConfig disworkConfig) throws IOException { - this(disworkConfig, null); + public Storage(DisworkFileSystemConfig config) throws IOException { + this(config, null); } /** @@ -298,7 +298,7 @@ map.put(newDataKey, EntryUtil.stringToBytes(metaBlock)); // creating a buffer of the size of a block - int bufferSize = disworkConfig.getBlockSize(); + int bufferSize = config.getBlockSize(); byte[] buffer = new byte[bufferSize]; while ((readResult = value.read(buffer)) != -1) { Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-06-21 10:14:34 UTC (rev 85) @@ -16,12 +16,13 @@ import java.util.List; import java.util.Random; +import javax.xml.transform.Source; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.experimental.categories.Categories.ExcludeCategory; import org.nuiton.diswork.fs.DisworkFileSystem; import org.nuiton.util.FileUtil; @@ -44,7 +45,7 @@ /** * The file will have this fixed size */ - static protected int randomFileSize = 25 * 1000; + static protected int randomFileSize = 11 * 1000 * 1000; static protected DisworkFileSystem fileSystem; @@ -84,7 +85,13 @@ */ @Test public void testWrite() throws Exception { - fileSystem.write("/my_file", new FileInputStream(randomFilePath)); + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } } /** @@ -93,9 +100,15 @@ */ @Test public void testExists() throws Exception { - fileSystem.write("/my_file", new FileInputStream(randomFilePath)); - assertTrue(fileSystem.exists("/my_file")); - assertFalse(fileSystem.exists("/my_other_file")); + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_file", new FileInputStream(randomFilePath)); + assertTrue(fileSystem.exists("/my_file")); + assertFalse(fileSystem.exists("/my_other_file")); + } finally { + IOUtils.closeQuietly(source); + } } /** @@ -117,25 +130,28 @@ byte[] bytes = new byte[1]; bytes[0] = -0x1; - InputStream source; + InputStream source = null; + try { + source = new ByteArrayInputStream(bytes); + fileSystem.write("/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } - source = new ByteArrayInputStream(bytes); - fileSystem.write("/my_file", source); + try { + source = new ByteArrayInputStream(bytes); + InputStream readResult = fileSystem.read("/my_file"); - source.close(); + int read = 0; + byte[] b = new byte[1]; - - source = new ByteArrayInputStream(bytes); - InputStream readResult = fileSystem.read("/my_file"); - - int read = 0; - byte[] b = new byte[1]; - - read = readResult.read(b); + read = readResult.read(b); - assertEquals(1, read); - assertArrayEquals(bytes, b); - + assertEquals(1, read); + assertArrayEquals(bytes, b); + } finally { + IOUtils.closeQuietly(source); + } } /** @@ -147,31 +163,35 @@ @Test public void testWriteRead() throws Exception { - InputStream source = new FileInputStream(randomFilePath); - - fileSystem.write("/my_file", source); + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } - source.close(); - - InputStream readResult; // now, the checks. We read the original file and the result of // a read() and then compare it byte-to-byte - - source = new FileInputStream(randomFilePath); - readResult = fileSystem.read("/my_file"); - assertEquals(randomFileSize, source.available()); - assertEquals(randomFileSize, readResult.available()); + InputStream readResult = null; + try { + source = new FileInputStream(randomFilePath); + readResult = fileSystem.read("/my_file"); + + assertEquals(randomFileSize, source.available()); + assertEquals(randomFileSize, readResult.available()); - byte[] sourceAsBytes = IOUtils.toByteArray(source); - byte[] readResultAsBytes = IOUtils.toByteArray(readResult); + byte[] sourceAsBytes = IOUtils.toByteArray(source); + byte[] readResultAsBytes = IOUtils.toByteArray(readResult); - assertArrayEquals(sourceAsBytes, readResultAsBytes); + assertArrayEquals(sourceAsBytes, readResultAsBytes); + } finally { + IOUtils.closeQuietly(source); + IOUtils.closeQuietly(readResult); + } - source.close(); - readResult.close(); - } /** @@ -180,22 +200,34 @@ */ @Test(expected = IOException.class) public void testFailAtWrite() throws Exception { - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } } /** * this writing a file that already exists should not be a problem */ @Test - public void testFailAtDoubleWrite() throws Exception { - InputStream source = new FileInputStream(randomFilePath); - fileSystem.write("/my_file", source); - source.close(); + public void testDoubleWrite() throws Exception { + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } - source = new FileInputStream(randomFilePath); - fileSystem.write("/my_file", source); - source.close(); + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } } /** @@ -217,12 +249,23 @@ @Test public void testWriteInFolder() throws Exception { fileSystem.createDirectory("/my_folder"); - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); fileSystem.createDirectory("/my_folder/my_sub_folder"); - fileSystem.write("/my_folder/my_sub_folder/my_file", - new FileInputStream(randomFilePath)); - assertTrue(fileSystem.exists("/my_folder/my_sub_folder/my_file")); + + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } + + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_sub_folder/my_file", source); + assertTrue(fileSystem.exists("/my_folder/my_sub_folder/my_file")); + } finally { + IOUtils.closeQuietly(source); + } } /** @@ -233,19 +276,29 @@ @Test public void testLinking() throws Exception { fileSystem.createDirectory("/my_folder"); - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); + + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } + fileSystem.createSymbolicLink("/my_link", "/my_folder/my_file"); - InputStream source = new FileInputStream(randomFilePath); - InputStream readResult = fileSystem.read("/my_link"); + InputStream readResult = null; + try { + source = new FileInputStream(randomFilePath); + readResult = fileSystem.read("/my_link"); - boolean actualContentEquality = - IOUtils.contentEquals(source, readResult); - source.close(); - readResult.close(); - - assertTrue(actualContentEquality); + boolean actualContentEquality = + IOUtils.contentEquals(source, readResult); + assertTrue(actualContentEquality); + } finally { + IOUtils.closeQuietly(source); + IOUtils.closeQuietly(readResult); + } } /** @@ -264,8 +317,14 @@ @Test public void testRemove() throws Exception { fileSystem.createDirectory("/my_folder"); - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); + + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } fileSystem.delete("/my_folder/my_file"); assertTrue(fileSystem.exists("/my_folder")); assertFalse(fileSystem.exists("/my_folder/my_file")); @@ -281,9 +340,15 @@ @Test(expected = IOException.class) public void testFailAtRemove() throws Exception { fileSystem.createDirectory("/my_folder"); - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } + // trying to remove a non-empty directory should raise an exception fileSystem.delete("/my_folder"); } @@ -295,8 +360,15 @@ public void testListDirectory() throws Exception { fileSystem.createDirectory("/my_folder"); fileSystem.createDirectory("/my_folder/my_sub_dir"); - fileSystem.write("/my_folder/my_file", - new FileInputStream(randomFilePath)); + + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/my_folder/my_file", source); + } finally { + IOUtils.closeQuietly(source); + } + fileSystem.createSymbolicLink("/my_folder/my_link", "my_file"); List<String> lsResult = fileSystem.readDirectory("/my_folder"); @@ -362,7 +434,14 @@ fileSystem.createDirectory("/dir/subdir"); fileSystem.createDirectory("/dir/subdir/subsubdir"); fileSystem.createSymbolicLink("/dir/link", "subdir"); - fileSystem.write("/dir/file", new FileInputStream(randomFilePath)); + + InputStream source = null; + try { + source = new FileInputStream(randomFilePath); + fileSystem.write("/dir/file", source); + } finally { + IOUtils.closeQuietly(source); + } fileSystem.createDirectory("/otherdir"); fileSystem.move("/dir/subdir", "/otherdir/subdir"); Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-06-21 10:14:34 UTC (rev 85) @@ -6,6 +6,7 @@ import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; +import java.io.FileInputStream; import java.util.ConcurrentModificationException; import org.apache.commons.io.IOUtils; @@ -38,8 +39,8 @@ DisworkFileSystemConfig.newKademliaDisworkConfig(bootstrapPort); DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig); - assertTrue(fileSystem.exists("/")); - assertTrue(fileSystem2.exists("/")); + fileSystem.write("/file", new FileInputStream(randomFilePath)); + assertTrue(fileSystem2.exists("/file")); } @Test Modified: trunk/pom.xml =================================================================== --- trunk/pom.xml 2010-06-19 18:55:14 UTC (rev 84) +++ trunk/pom.xml 2010-06-21 10:14:34 UTC (rev 85) @@ -81,7 +81,7 @@ <dependency> <groupId>org.planx</groupId> <artifactId>koala-xmlstore</artifactId> - <version>0.4.12</version> + <version>0.4.12_cl20100612</version> </dependency> <dependency> <groupId>commons-digester</groupId>
participants (1)
-
bleny@users.nuiton.org