r26 - in trunk/diswork-fs/src: main/java/org/nuiton/disworkfs main/java/org/nuiton/disworkfs/services main/java/org/nuiton/disworkfs/split main/java/org/nuiton/disworkfs/transport main/java/org/nuiton/disworkfs/util test/java/org/nuiton/disworkfs test/java/org/nuiton/disworkfs/split
Author: bleny Date: 2010-05-06 18:00:05 +0200 (Thu, 06 May 2010) New Revision: 26 Url: http://nuiton.org/repositories/revision/diswork/26 Log: exceptions, menage Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java Removed: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,15 +1,39 @@ package org.nuiton.disworkfs; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; public interface DistributedFileSystem { - - public boolean exists(String path); - public void write(String path, File source); - - public File read(String path); + /** + * checks the existence of a file on the virtual FS + * @param path a path to a file on the virtual FS + * @return true if the file exists, false if not + * @throws InterruptedException + */ + public boolean exists(String path) throws InterruptedException; - public void remove(String path); + /** + * write a file + * @param path path of the virtual FS where the file should be written + * @param source the file to copy on the VFS + * @throws IOException if problems occurs while reading the source or writing on VFS + * @throws InterruptedException + */ + public void write(String path, File source) throws IOException, InterruptedException; + /** + * read a file on the VFS, the file is downloaded if needed (may take some times) + * @param path the path in the virtual FS to the file you want to read + * @return ?? + * @throws FileNotFoundException if no file have been written to this path (you may use {@link #exists(String) to check before read} + * @throws IOException if a problem occur while reading the local file + * @throws InterruptedException + */ + public File read(String path) throws FileNotFoundException, IOException, InterruptedException; + + public void remove(String path); + + // TODO list a directory content } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 16:00:05 UTC (rev 26) @@ -3,17 +3,22 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.nuiton.disworkfs.services.DisworkServicesManager; import org.nuiton.disworkfs.services.DownloadService; import org.nuiton.disworkfs.services.LookUpService; import org.nuiton.disworkfs.services.UploadService; +import org.nuiton.disworkfs.split.FileDescription; +import org.nuiton.disworkfs.util.SimpleDownload; +import org.nuiton.disworkfs.util.SimpleLookUp; import org.nuiton.util.FileUtil; -public class DisworkFileSystem { +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +public class DisworkFileSystem implements DistributedFileSystem { + protected DownloadService downloadService; protected UploadService uploadService; protected LookUpService lookUpService; @@ -36,7 +41,8 @@ } - public OutputStream read(String path) throws InterruptedException, FileNotFoundException { + @Override + public File read(String path) throws InterruptedException, FileNotFoundException { log.info("trying to read " + path); @@ -56,21 +62,50 @@ } + // FIXME return null; } - public void write(String path, File source) throws IOException { + @Override + public void write(String path, File source) throws IOException, InterruptedException { File target = new File(disworkConfig.getStoragePath(), path); if (target.exists()) { throw new IOException(target.getAbsolutePath() + " already exists"); - } + } + if (this.exists(path)) { + throw new IOException(target.getAbsolutePath() + " already exists"); + } + FileUtil.copy(source, target); } + @Override + public boolean exists(String path) throws InterruptedException { + + File file = new File(disworkConfig.getStoragePath(), path); + boolean fileExists = false; + + if (file.exists()) { + fileExists = true; + } else { + SimpleLookUp simpleLookUp = new SimpleLookUp(path, lookUpService); + FileDescription lookUpResult = simpleLookUp.runLookUp(); + fileExists = (lookUpResult != null); + } + + return fileExists; + } + + @Override + public void remove(String path) { + throw new NotImplementedException(); + } + public void close() { disworkServicesManager.stop(); } + } Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,70 +0,0 @@ -package org.nuiton.disworkfs; - -import java.util.ArrayList; -import java.util.List; - -import org.nuiton.disworkfs.services.AbstractDisworkService; -import org.nuiton.disworkfs.transport.Message; -import org.nuiton.disworkfs.transport.Receiver; -import org.nuiton.disworkfs.transport.Transport; -import org.nuiton.disworkfs.transport.jgroups.JGroupsTransport; - -public class DisworkServicesManager implements Receiver { - - /** - * all the running services - */ - private List<AbstractDisworkService> services = new ArrayList<AbstractDisworkService>(); - - protected Transport transport; - - private DisworkConfig disworkConfig; - - public DisworkServicesManager(DisworkConfig disworkConfig) { - this.disworkConfig = disworkConfig; - - transport = new JGroupsTransport(disworkConfig); - - transport.setReceiver(this); - - } - - public void register(AbstractDisworkService service) { - - // dependency injection, the service need a channel to send a message - // service.setJChannel(jChannel); - service.setTransport(transport); - // ... and the disworkConfig - service.setDisworkConfig(disworkConfig); - - // each service is run in his own thread - Thread thread = new Thread(service); - thread.start(); - - // add this service to the list of running services - services.add(service); - } - - public void unRegister(AbstractDisworkService service) { - services.remove(service); - } - - - @Override - public void receiveMessage(Message message) { - for (AbstractDisworkService service : services) { - service.receiveMessage(message); - } - } - - /** - * this stop all the diswork services and close the - * ressources used by the transport layer - */ - public void stop() { - for (AbstractDisworkService service : services) { - service.stop(); - } - transport.close(); - } -} \ No newline at end of file Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,9 +0,0 @@ -package org.nuiton.disworkfs; - -import org.nuiton.disworkfs.services.DownloadService; - -public interface DownloadObserver { - - public void updateDownloadStatus(DownloadService downloadService); - -} Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,9 +0,0 @@ -package org.nuiton.disworkfs; - -import org.nuiton.disworkfs.split.FileDescription; - -public interface LookUpObserver { - - public void receiveResult(FileDescription fileDescription); - -} Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,12 +0,0 @@ -package org.nuiton.disworkfs; - - -public class RunMe { - - public static void main(String args[]) { - DisworkConfig disworkConfig = new DisworkConfig(); - - DisworkFileSystem disworkFileSystem = new DisworkFileSystem(disworkConfig); - } - -} Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,87 +0,0 @@ -package org.nuiton.disworkfs; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.nuiton.disworkfs.services.DownloadService; -import org.nuiton.disworkfs.services.LookUpService; -import org.nuiton.disworkfs.split.FileDescription; - - - -public class SimpleDownload implements DownloadObserver, LookUpObserver { - - /** - * only used for synchronisation purpose - */ - protected final Object lock = new Object(); - - private Boolean lookUpResponseReceived = false; - private Boolean downloadFinised = false; - - private FileDescription fileDescription = null; - - private DownloadService downloadService; - private LookUpService lookUpService; - private String filePath; - - private static final Log log = LogFactory.getLog(SimpleDownload.class); - - public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) { - this.filePath = filePath; - this.downloadService = downloadService; - this.lookUpService = lookUpService; - } - - public boolean initiateDownload() throws InterruptedException { - - lookUpService.lookForFileName(filePath, this); - - synchronized (lock) { - lock.wait(10 * 1000); // time out at 10 seconds - } - - // TODO throw file not found if timeout exceed - if (lookUpResponseReceived) { - log.info("look-up response received for " + filePath); - } else { - log.info("no look-up response received for " + filePath); - } - - return lookUpResponseReceived; - } - - - public void startDownload() throws InterruptedException { - if (log.isDebugEnabled()) - log.info("starting download for " + fileDescription.getFileName()); - - downloadService.startDownload(fileDescription, this); - - synchronized (lock) { - lock.wait(); - if (log.isDebugEnabled()) - log.info("download " + fileDescription.getFileName() + " is complete"); - } - } - - - @Override - public void updateDownloadStatus(DownloadService downloadService) { - downloadFinised = downloadService.isFinished(fileDescription); - if (downloadFinised) { - synchronized (lock) { - lock.notify(); - } - } - } - - @Override - public void receiveResult(FileDescription fileDescription) { - synchronized (lock) { - lookUpResponseReceived = true; - this.fileDescription = fileDescription; - lock.notify(); - } - } - -} Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,5 +1,6 @@ package org.nuiton.disworkfs.services; +import java.io.IOException; import java.io.Serializable; import org.apache.commons.logging.Log; @@ -31,25 +32,28 @@ @Override public void receiveMessage(Message message) { - - Serializable obj = message.getContent(); - if (obj instanceof LookUpMessage) { - receiveLookUpMessage(message); - } else if (obj instanceof LookUpResponseMessage) { - receiveLookUpResponseMessage(message); - } else if (obj instanceof FileRequestMessage) { - receiveFileRequestMessage(message); - } else if (obj instanceof FileTransferMessage) { - receiveFileTransferMessage(message); - } else { - log.error("unknow message received"); - } + try { + Serializable obj = message.getContent(); + if (obj instanceof LookUpMessage) { + receiveLookUpMessage(message); + } else if (obj instanceof LookUpResponseMessage) { + receiveLookUpResponseMessage(message); + } else if (obj instanceof FileRequestMessage) { + receiveFileRequestMessage(message); + } else if (obj instanceof FileTransferMessage) { + receiveFileTransferMessage(message); + } else { + log.error("unknow message received"); + } + } catch (IOException eee) { + log.error("IOException encoutered after receiving a message", eee); + } } - public void receiveLookUpMessage(Message msg) {} + public void receiveLookUpMessage(Message msg) throws IOException {} public void receiveLookUpResponseMessage(Message msg) {} - public void receiveFileRequestMessage(Message msg) {} - public void receiveFileTransferMessage(Message msg) {} + public void receiveFileRequestMessage(Message msg) throws IOException {} + public void receiveFileTransferMessage(Message msg) throws IOException {} public void setTransport(Transport transport) { this.transport = transport; Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,70 @@ +package org.nuiton.disworkfs.services; + +import java.util.ArrayList; +import java.util.List; + +import org.nuiton.disworkfs.DisworkConfig; +import org.nuiton.disworkfs.transport.Message; +import org.nuiton.disworkfs.transport.Receiver; +import org.nuiton.disworkfs.transport.Transport; +import org.nuiton.disworkfs.transport.jgroups.JGroupsTransport; + +public class DisworkServicesManager implements Receiver { + + /** + * all the running services + */ + private List<AbstractDisworkService> services = new ArrayList<AbstractDisworkService>(); + + protected Transport transport; + + private DisworkConfig disworkConfig; + + public DisworkServicesManager(DisworkConfig disworkConfig) { + this.disworkConfig = disworkConfig; + + transport = new JGroupsTransport(disworkConfig); + + transport.setReceiver(this); + + } + + public void register(AbstractDisworkService service) { + + // dependency injection, the service need a channel to send a message + // service.setJChannel(jChannel); + service.setTransport(transport); + // ... and the disworkConfig + service.setDisworkConfig(disworkConfig); + + // each service is run in his own thread + Thread thread = new Thread(service); + thread.start(); + + // add this service to the list of running services + services.add(service); + } + + public void unRegister(AbstractDisworkService service) { + services.remove(service); + } + + + @Override + public void receiveMessage(Message message) { + for (AbstractDisworkService service : services) { + service.receiveMessage(message); + } + } + + /** + * this stop all the diswork services and close the + * ressources used by the transport layer + */ + public void stop() { + for (AbstractDisworkService service : services) { + service.stop(); + } + transport.close(); + } +} \ No newline at end of file Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,6 +1,7 @@ package org.nuiton.disworkfs.services; import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -8,12 +9,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.nuiton.disworkfs.DownloadObserver; import org.nuiton.disworkfs.messages.FileRequestMessage; import org.nuiton.disworkfs.messages.FileTransferMessage; import org.nuiton.disworkfs.split.FileDescription; import org.nuiton.disworkfs.split.SplitFileFromChunks; import org.nuiton.disworkfs.transport.Message; +import org.nuiton.disworkfs.util.DownloadObserver; public class DownloadService extends AbstractDisworkService { @@ -26,7 +27,7 @@ private static final Log log = LogFactory.getLog(DownloadService.class); @Override - public void receiveFileTransferMessage(Message message) { + public void receiveFileTransferMessage(Message message) throws IOException { FileTransferMessage fileTransferMessage = (FileTransferMessage) message.getContent(); FileDescription fileDescription = fileTransferMessage.getFileDescrition(); @@ -87,6 +88,7 @@ // it's the first observer for this download ever, let's construct a list observersList = new LinkedList<DownloadObserver>(); } + observersList.add(downloadObserver); downloadObservers.put(fileDescription.getFileCheckSum(), observersList); } @@ -117,7 +119,8 @@ if (log.isDebugEnabled()) log.info("sending file request for " + fileDescription.getFileName()); + registerObserver(fileDescription, downloadObserver); - + } } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-06 16:00:05 UTC (rev 26) @@ -6,10 +6,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.nuiton.disworkfs.LookUpObserver; import org.nuiton.disworkfs.messages.LookUpMessage; import org.nuiton.disworkfs.messages.LookUpResponseMessage; import org.nuiton.disworkfs.transport.Message; +import org.nuiton.disworkfs.util.LookUpObserver; @@ -37,6 +37,7 @@ message.send(); log.info("look-up message sent for " + fileName); + // FIXME memory leak if multiple look-up on never-will-be-available files requestToRequester.put(fileName, lookUpObserver); } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 16:00:05 UTC (rev 26) @@ -20,95 +20,85 @@ * When receiving a LookUp, sending a LookUpResponse if owning file on local FS */ public class UploadService extends AbstractDisworkService { - - private static final Log log = LogFactory.getLog(UploadService.class); - - @Override - public void receiveFileRequestMessage(Message message) { - FileRequestMessage fileRequestMessage = (FileRequestMessage) message.getContent(); - File file = new File(disworkConfig.getStoragePath(), fileRequestMessage.getfileDescription().getFileName()); + private static final Log log = LogFactory.getLog(UploadService.class); - if (log.isDebugEnabled()) - log.info("file request message received for " + fileRequestMessage.getfileDescription().getFileName()); - - if (file.exists()) { - - if (log.isDebugEnabled()) - log.info("file found in path " + file.getAbsolutePath()); + @Override + public void receiveFileRequestMessage(Message message) throws IOException { + FileRequestMessage fileRequestMessage = (FileRequestMessage) message.getContent(); - SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file); + File file = new File(disworkConfig.getStoragePath(), fileRequestMessage.getfileDescription().getFileName()); - try { - FileDescription fileDescription = splitFileFromLocalFile.getFileDescription(); - fileDescription.setFileName(fileRequestMessage.getfileDescription().getFileName()); - - List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks(); - - for (FileChunk fileChunk : chunks) { - Message reply = message.newReply(); - - FileTransferMessage fileTransferMessage = new FileTransferMessage(fileChunk, fileDescription); + if (log.isDebugEnabled()) + log.info("file request message received for " + fileRequestMessage.getfileDescription().getFileName()); - reply.setContent(fileTransferMessage); + if (file.exists()) { - if (log.isDebugEnabled()) - log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber()); + if (log.isDebugEnabled()) + log.info("file found in path " + file.getAbsolutePath()); - reply.send(); - - } + SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file); - if (log.isDebugEnabled()) - log.info("all chunks sent"); - - } catch (IOException e) { - log.error("can't read file " + file.getAbsolutePath()); - } - } else { - if (log.isDebugEnabled()) - log.info("file not found in path " + file.getAbsolutePath() + " don't send response"); - } + FileDescription fileDescription = splitFileFromLocalFile.getFileDescription(); + fileDescription.setFileName(fileRequestMessage.getfileDescription().getFileName()); - } + List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks(); - @Override - public void receiveLookUpMessage(Message message) { - LookUpMessage lookUpMessage = (LookUpMessage) message.getContent(); + for (FileChunk fileChunk : chunks) { + Message reply = message.newReply(); - if (log.isDebugEnabled()) - log.info("lookup message received : looking for file " + lookUpMessage.getFileName()); - - File file = new File(disworkConfig.getStoragePath(), lookUpMessage.getFileName()); - + FileTransferMessage fileTransferMessage = new FileTransferMessage(fileChunk, fileDescription); - if (file.exists()) { - if (log.isDebugEnabled()) - log.info(lookUpMessage.getFileName() + " file found at " + file.getAbsolutePath()); + reply.setContent(fileTransferMessage); - SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file); + if (log.isDebugEnabled()) + log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber()); - try { + reply.send(); - Message reply = message.newReply(); + } - FileDescription fileDescription = splitFileFromLocalFile.getFileDescription(); - - fileDescription.setFileName(lookUpMessage.getFileName()); - - reply.setContent(new LookUpResponseMessage(fileDescription)); + if (log.isDebugEnabled()) + log.info("all chunks sent"); + } else { + if (log.isDebugEnabled()) + log.info("file not found in path " + file.getAbsolutePath() + " don't send response"); + } - if (log.isDebugEnabled()) - log.info("sending lookUpResponse response for " + lookUpMessage.getFileName()); + } - reply.send(); - } catch (IOException e) { - log.error("can't read file " + file.getAbsolutePath() + " aborting file transfer"); - } - } else { - if (log.isDebugEnabled()) - log.info(lookUpMessage.getFileName() + " file not found at " + file.getAbsolutePath() + " no response sent"); - } + @Override + public void receiveLookUpMessage(Message message) throws IOException { + LookUpMessage lookUpMessage = (LookUpMessage) message.getContent(); - } + if (log.isDebugEnabled()) + log.info("lookup message received : looking for file " + lookUpMessage.getFileName()); + + File file = new File(disworkConfig.getStoragePath(), lookUpMessage.getFileName()); + + if (file.exists()) { + if (log.isDebugEnabled()) + log.info(lookUpMessage.getFileName() + " file found at " + file.getAbsolutePath()); + + SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file); + + + Message reply = message.newReply(); + + FileDescription fileDescription = splitFileFromLocalFile.getFileDescription(); + + fileDescription.setFileName(lookUpMessage.getFileName()); + + reply.setContent(new LookUpResponseMessage(fileDescription)); + + if (log.isDebugEnabled()) + log.info("sending lookUpResponse response for " + lookUpMessage.getFileName()); + + reply.send(); + } else { + if (log.isDebugEnabled()) + log.info(lookUpMessage.getFileName() + " file not found at " + file.getAbsolutePath() + " no response sent"); + } + + } } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 16:00:05 UTC (rev 26) @@ -12,25 +12,22 @@ import java.util.zip.CRC32; import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class SplitFileFromChunks { - + protected FileDescription fileDescription; protected File destination; protected File chunkStatusFile; - public SplitFileFromChunks(FileDescription fileDescription, File destination) { + private Log log = LogFactory.getLog(SplitFileFromChunks.class); + + public SplitFileFromChunks(FileDescription fileDescription, File destination) throws IOException { this.fileDescription = fileDescription; this.destination = destination; this.chunkStatusFile = new File(destination.getParent(), "." + destination.getName() + ".index"); - - if (!chunkStatusFile.exists()) { - BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks()); - bitSet.set(0, fileDescription.getNumberOfChunks()); - writeChunkStatusFile(bitSet); - bitSet = readChunkStatusFile(); - } - + if (!destination.exists()) { try { RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw"); @@ -38,51 +35,50 @@ randomAccessFile.close(); } catch (FileNotFoundException e) { // we just checked !destination.exists() - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } + + if (!chunkStatusFile.exists()) { + // force the creation of all needed directories + log.debug("creating directory " + chunkStatusFile.getParentFile()); + chunkStatusFile.getParentFile().mkdirs(); + BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks()); + bitSet.set(0, fileDescription.getNumberOfChunks()); + writeChunkStatusFile(bitSet); + bitSet = readChunkStatusFile(); + } + } - - public boolean isComplete() { + + public boolean isComplete() throws IOException { BitSet bitSet = readChunkStatusFile(); boolean fileIsComplete = bitSet.cardinality() == 0; return fileIsComplete; } - - - protected BitSet readChunkStatusFile() { + + + protected BitSet readChunkStatusFile() throws IOException { BitSet bitSet = null; try { FileInputStream is = new FileInputStream(chunkStatusFile); bitSet = (BitSet) new ObjectInputStream(is).readObject(); is.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } return bitSet; } - - - protected void writeChunkStatusFile(BitSet bitSet) { - try { - FileOutputStream os = new FileOutputStream(chunkStatusFile); - new ObjectOutputStream(os).writeObject(bitSet); - os.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + + + protected void writeChunkStatusFile(BitSet bitSet) throws IOException { + FileOutputStream os = new FileOutputStream(chunkStatusFile); + new ObjectOutputStream(os).writeObject(bitSet); + os.close(); } - - - public void addChunk(FileChunk fileChunk) { - try { + + + public void addChunk(FileChunk fileChunk) throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw"); byte[] data = fileChunk.getData(); int off = FileChunk.MAX_CHUNK_SIZE * fileChunk.getChunkNumber(); @@ -90,30 +86,22 @@ randomAccessFile.seek(off); randomAccessFile.write(data, 0, len); randomAccessFile.close(); - + // updating status - BitSet chunkStatus = readChunkStatusFile(); chunkStatus.clear(fileChunk.getChunkNumber()); writeChunkStatusFile(chunkStatus); - - + + if (isComplete()) { long expectedChecksum = fileDescription.getFileCheckSum(); long actualCheckSum = FileUtils.checksum(destination, new CRC32()).getValue(); - + if (actualCheckSum != expectedChecksum) { throw new IOException("checksum fail"); } } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } } - + } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,5 +1,6 @@ package org.nuiton.disworkfs.transport; + public interface Receiver { public void receiveMessage(Message message); Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,9 @@ +package org.nuiton.disworkfs.util; + +import org.nuiton.disworkfs.services.DownloadService; + +public interface DownloadObserver { + + public void updateDownloadStatus(DownloadService downloadService); + +} Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,9 @@ +package org.nuiton.disworkfs.util; + +import org.nuiton.disworkfs.split.FileDescription; + +public interface LookUpObserver { + + public void receiveResult(FileDescription fileDescription); + +} Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,71 @@ +package org.nuiton.disworkfs.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.disworkfs.services.DownloadService; +import org.nuiton.disworkfs.services.LookUpService; +import org.nuiton.disworkfs.split.FileDescription; + + + +public class SimpleDownload implements DownloadObserver { + + /** + * only used for synchronisation purpose + */ + protected final Object lock = new Object(); + + protected Boolean downloadFinised = false; + + protected FileDescription fileDescription = null; + + protected DownloadService downloadService; + + protected SimpleLookUp simpleLookUp; + + private static final Log log = LogFactory.getLog(SimpleDownload.class); + + public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) { + this.downloadService = downloadService; + + simpleLookUp = new SimpleLookUp(filePath, lookUpService); + } + + public boolean initiateDownload() throws InterruptedException { + fileDescription = simpleLookUp.runLookUp(); + return fileDescription != null; + } + + /** + * initiateDownload() must be called <strong>before</strong> + * download start. + * @throws InterruptedException + */ + public void startDownload() throws InterruptedException { + if (fileDescription == null) { + log.error("download started without FileDescription"); + } else { + if (log.isDebugEnabled()) + log.info("starting download for " + fileDescription.getFileName()); + + downloadService.startDownload(fileDescription, this); + + synchronized (lock) { + lock.wait(); + if (log.isDebugEnabled()) + log.info("download " + fileDescription.getFileName() + " is complete"); + } + } + } + + + @Override + public void updateDownloadStatus(DownloadService downloadService) { + downloadFinised = downloadService.isFinished(fileDescription); + if (downloadFinised) { + synchronized (lock) { + lock.notify(); + } + } + } +} Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,60 @@ +package org.nuiton.disworkfs.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.disworkfs.services.LookUpService; +import org.nuiton.disworkfs.split.FileDescription; + +public class SimpleLookUp implements LookUpObserver { + + /** + * only used for synchronisation purpose + */ + protected final Object lock = new Object(); + + protected Boolean lookUpResponseReceived = false; + + protected FileDescription fileDescription = null; + + protected LookUpService lookUpService; + + protected String filePath; + + private static final Log log = LogFactory.getLog(SimpleDownload.class); + + public SimpleLookUp(String filePath, LookUpService lookUpService) { + this.filePath = filePath; + this.lookUpService = lookUpService; + } + + /** + * + * @return the FileDescription or null if file is not found + * @throws InterruptedException + */ + public FileDescription runLookUp() throws InterruptedException { + + lookUpService.lookForFileName(filePath, this); + + synchronized (lock) { + lock.wait(10 * 1000); // time out at 10 seconds + } + + if (lookUpResponseReceived) { + log.info("look-up response received for " + filePath); + } else { + log.info("no look-up response received for " + filePath); + } + + return this.fileDescription; + } + + @Override + public void receiveResult(FileDescription fileDescription) { + synchronized (lock) { + this.lookUpResponseReceived = true; + this.fileDescription = fileDescription; + lock.notify(); + } + } +} Deleted: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,57 +0,0 @@ -package org.nuiton.disworkfs; - -import java.io.File; -import java.util.Random; - -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Before; - -/** - * This classes is for test purpose. It creates, before all tests - * a random file with a given size in a temp directory - */ -public abstract class AbstractSplitFileTest { - - static protected Random random = new Random(); - - /** - * a place to store files for the test - * it's a subdirectory of the OS temp dir - * e.g. under linux /tmp/disworkfs/tests/ - */ - static protected String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests"; - - /** - * We will create a file at this path for test purpose - */ - static protected String randomFilePath = tempDirectoryPath + "/randomfile"; - - /** - * The file will have this fixed size - */ - static protected int randomFileSize = 3500; - - @Before - public void setUp() throws Exception { - File tempDirectory = new File(tempDirectoryPath); - tempDirectory.mkdir(); - FileUtils.forceDeleteOnExit(tempDirectory); - - // creating random data for the file - byte[] randomBytes = new byte[randomFileSize]; - random.nextBytes(randomBytes); - - // dumping random data into the file - File randomFile = new File(randomFilePath); - FileUtils.writeByteArrayToFile(randomFile, randomBytes); - - } - - @After - public void tearDown() throws Exception { - // cleaning - FileUtils.forceDelete(new File(tempDirectoryPath)); - } - -} Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 16:00:05 UTC (rev 26) @@ -1,6 +1,7 @@ package org.nuiton.disworkfs; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.File; @@ -95,12 +96,18 @@ DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1); DisworkFileSystem disworkFileSystem2 = new DisworkFileSystem(disworkConfig2); - disworkFileSystem1.write("monfichier", new File(randomFilePath)); - disworkFileSystem2.read("monfichier"); + disworkFileSystem1.write("mon/chemin/vers/mon/fichier", new File(randomFilePath)); + boolean existsResult = disworkFileSystem2.exists("mon/chemin/vers/mon/fichier"); + assertTrue(existsResult); + existsResult = disworkFileSystem2.exists("unautrefichierquinexistepas"); + assertFalse(existsResult); + + disworkFileSystem2.read("mon/chemin/vers/mon/fichier"); + // TODO - File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/monfichier"); - File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/monfichier"); + File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/mon/chemin/vers/mon/fichier"); + File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/mon/chemin/vers/mon/fichier"); try { boolean actualContentEquality = IOUtils.contentEquals( Added: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java (rev 0) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java 2010-05-06 16:00:05 UTC (rev 26) @@ -0,0 +1,57 @@ +package org.nuiton.disworkfs.split; + +import java.io.File; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; + +/** + * This classes is for test purpose. It creates, before all tests + * a random file with a given size in a temp directory + */ +public abstract class AbstractSplitFileTest { + + static protected Random random = new Random(); + + /** + * a place to store files for the test + * it's a subdirectory of the OS temp dir + * e.g. under linux /tmp/disworkfs/tests/ + */ + static protected String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests"; + + /** + * We will create a file at this path for test purpose + */ + static protected String randomFilePath = tempDirectoryPath + "/randomfile"; + + /** + * The file will have this fixed size + */ + static protected int randomFileSize = 3500; + + @Before + public void setUp() throws Exception { + File tempDirectory = new File(tempDirectoryPath); + tempDirectory.mkdir(); + FileUtils.forceDeleteOnExit(tempDirectory); + + // creating random data for the file + byte[] randomBytes = new byte[randomFileSize]; + random.nextBytes(randomBytes); + + // dumping random data into the file + File randomFile = new File(randomFilePath); + FileUtils.writeByteArrayToFile(randomFile, randomBytes); + + } + + @After + public void tearDown() throws Exception { + // cleaning + FileUtils.forceDelete(new File(tempDirectoryPath)); + } + +} Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-06 16:00:05 UTC (rev 26) @@ -10,7 +10,6 @@ import org.apache.commons.io.IOUtils; import org.junit.Test; -import org.nuiton.disworkfs.AbstractSplitFileTest; public class SplitFileFromChunksTest extends AbstractSplitFileTest { Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 13:42:54 UTC (rev 25) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 16:00:05 UTC (rev 26) @@ -9,7 +9,6 @@ import java.util.List; import org.junit.Test; -import org.nuiton.disworkfs.AbstractSplitFileTest; public class SplitFileFromLocalFileTest extends AbstractSplitFileTest {
participants (1)
-
bleny@users.nuiton.org