Diswork-commits
Threads by month
- ----- 2026 -----
- June
- May
- April
- March
- February
- January
- ----- 2025 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
May 2010
- 3 participants
- 36 discussions
Author: bleny
Date: 2010-05-14 15:17:31 +0200 (Fri, 14 May 2010)
New Revision: 33
Url: http://nuiton.org/repositories/revision/diswork/33
Log:
[maven-release-plugin] prepare release diswork-0.0.1-jgroups
Modified:
trunk/diswork-fs/pom.xml
trunk/pom.xml
Modified: trunk/diswork-fs/pom.xml
===================================================================
--- trunk/diswork-fs/pom.xml 2010-05-14 13:15:30 UTC (rev 32)
+++ trunk/diswork-fs/pom.xml 2010-05-14 13:17:31 UTC (rev 33)
@@ -1,17 +1,16 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.nuiton</groupId>
<artifactId>diswork</artifactId>
- <version>0.0.1-SNAPSHOT</version>
+ <version>0.0.1-jgroups</version>
</parent>
<groupId>org.nuiton.diswork</groupId>
<artifactId>diswork-fs</artifactId>
<packaging>jar</packaging>
- <version>0.0.1-SNAPSHOT</version>
+ <version>0.0.1-jgroups</version>
<name>disworkfs</name>
<url>http://maven.apache.org</url>
<dependencies>
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-14 13:15:30 UTC (rev 32)
+++ trunk/pom.xml 2010-05-14 13:17:31 UTC (rev 33)
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -15,7 +14,7 @@
</parent>
<artifactId>diswork</artifactId>
- <version>0.0.1-SNAPSHOT</version>
+ <version>0.0.1-jgroups</version>
<modules>
<module>diswork-fs</module>
@@ -179,9 +178,9 @@
<!-- Source control management. -->
<scm>
- <connection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</connection>
- <developerConnection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</developerConnection>
- <url>http://www.nuiton.org/repositories/browse/diswork/diswork/trunk</url>
+ <connection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/tags/diswork-0.0.1-jgroups</connection>
+ <developerConnection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/tags/diswork-0.0.1-jgroups</developerConnection>
+ <url>http://www.nuiton.org/repositories/browse/diswork/diswork/tags/diswork-0.0.…</url>
</scm>
</project>
1
0
Author: bleny
Date: 2010-05-14 15:15:30 +0200 (Fri, 14 May 2010)
New Revision: 32
Url: http://nuiton.org/repositories/revision/diswork/32
Log:
xml clean
Modified:
trunk/pom.xml
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-14 12:41:11 UTC (rev 31)
+++ trunk/pom.xml 2010-05-14 13:15:30 UTC (rev 32)
@@ -159,7 +159,6 @@
<pluginManagement>
<plugins>
-
<plugin>
<artifactId>maven-site-plugin</artifactId>
<dependencies>
@@ -170,10 +169,8 @@
</dependency>
</dependencies>
</plugin>
-
</plugins>
</pluginManagement>
-
</build>
<!-- ************************************************************* -->
1
0
Author: bleny
Date: 2010-05-14 14:41:11 +0200 (Fri, 14 May 2010)
New Revision: 31
Url: http://nuiton.org/repositories/revision/diswork/31
Log:
maj pom
Modified:
trunk/diswork-fs/pom.xml
Modified: trunk/diswork-fs/pom.xml
===================================================================
--- trunk/diswork-fs/pom.xml 2010-05-14 09:35:37 UTC (rev 30)
+++ trunk/diswork-fs/pom.xml 2010-05-14 12:41:11 UTC (rev 31)
@@ -32,10 +32,6 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- <dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-utils</artifactId>
</dependency>
1
0
r30 - in trunk: diswork-fs/src/main/java/org/nuiton/disworkfs src/site src/site/rst src/site/rst/diswork-fs
by bleny@users.nuiton.org 14 May '10
by bleny@users.nuiton.org 14 May '10
14 May '10
Author: bleny
Date: 2010-05-14 11:35:37 +0200 (Fri, 14 May 2010)
New Revision: 30
Url: http://nuiton.org/repositories/revision/diswork/30
Log:
menage avant changement de version
Added:
trunk/src/site/rst/diswork-fs/
trunk/src/site/rst/diswork-fs/history.rst
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
trunk/src/site/site_en.xml
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-10 09:30:32 UTC (rev 29)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-14 09:35:37 UTC (rev 30)
@@ -47,7 +47,5 @@
public InputStream read(String path) throws FileNotFoundException,
IOException, InterruptedException;
- public void remove(String path);
- // TODO bleny 2010-05-10 list a directory content
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-10 09:30:32 UTC (rev 29)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-14 09:35:37 UTC (rev 30)
@@ -20,8 +20,6 @@
import org.nuiton.disworkfs.util.SimpleDownload;
import org.nuiton.disworkfs.util.SimpleLookUp;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
public class DisworkFileSystem implements DistributedFileSystem {
protected DownloadService downloadService;
@@ -97,9 +95,6 @@
BufferedInputStream bufferedSource = null;
BufferedOutputStream bufferedTarget = null;
- // FIXME bleny 2010-05-07 use buffered input stream ?
- // FIXME bleny 2010-05-07 close 2 times when new... new ???
-
try {
bufferedSource = new BufferedInputStream(source);
bufferedTarget = new BufferedOutputStream(new FileOutputStream(
@@ -133,11 +128,6 @@
return fileExists;
}
- @Override
- public void remove(String path) {
- throw new NotImplementedException();
- }
-
public void close() {
disworkServicesManager.stop();
}
Added: trunk/src/site/rst/diswork-fs/history.rst
===================================================================
--- trunk/src/site/rst/diswork-fs/history.rst (rev 0)
+++ trunk/src/site/rst/diswork-fs/history.rst 2010-05-14 09:35:37 UTC (rev 30)
@@ -0,0 +1,48 @@
+Premier prototype
+=================
+
+L'objectif était de réaliser un système opportuniste : à chaque fois que des
+données sont envoyées par un nœud parce qu'un autre nœud les a demandées,
+l'envoi en multicast permet à tous les autres nœuds de recevoir les données et
+ainsi faire la réplication.
+
+Le premier prototype de diswork-fs est basé sur l'échange de messages. Les
+nœuds connectés à un même réseau peuvent envoyés et recevoir en multicast des
+messages. Il existe quatre types de messages :
+
+* LookUp qui permet de demander à tous les noeuds, étant donné un chemin,
+ si un fichier existe ;
+* LookUpResponse qui est envoyé si un nœud suite à la réception d'un LookUp,
+ constate qu'il est en possession du fichier demandé. Avant la réponse, on
+ calcule à partir du fichier un descripteur (FileDescription) comprenant
+ diverses informations nécessaire à l'échange du fichier (taille totale,
+ somme de contrôle, nombre de parties) ;
+* FileRequest est éventuellement envoyé après réception d'un LookUpResponse,
+ il vise à demander l'envoi des données d'un fichier ;
+* La réception d'un FileRequest, génère l'envoi, en réponse, de message de types
+ FileTransfer. Chacun d'eux contient une partie du fichier.
+
+Ainsi, ce protocole permet :
+
+* la recherche d'un fichier pour un chemin donné dans une arborescence ;
+* si besoin, son téléchargement ;
+* la réplication.
+
+Ce protocole ne permet pas :
+
+* Lister le contenu d'un répertoire ;
+* Utiliser des liens symboliques.
+
+Cela n'est pas rendu possible par l'absence d'informations communes et
+synchronisées entre les nœuds. Pour remédier à ce problème, il semble qu'une
+voie possible serait de répliquer entre tous les nœuds l'intégralité de
+l'arbre décrivant l'arborescence[1]_ et de ne pratiquer l'échange des données
+des fichiers qu'au moment opportun[2]_.
+
+Ce premier prototype a été implémenté grâce à JGroups.
+Il est fonctionnel s'il s'agit d'avoir un système de
+fichier partagé et si on lit et écrit dans l'arborescence à des chemins connus
+à l'avance. C'est trop limité pour un système de fichier.
+
+.. [1] voir le building block `ReplicatedTree <http://www.jgroups.org/javadoc/org/jgroups/blocks/ReplicatedTree.html>`_.
+.. [2] voir `Replicated Filesystem based on ReplicatedTree <http://www.jgroups.org/open_projects.html>`_.
\ No newline at end of file
Modified: trunk/src/site/site_en.xml
===================================================================
--- trunk/src/site/site_en.xml 2010-05-10 09:30:32 UTC (rev 29)
+++ trunk/src/site/site_en.xml 2010-05-14 09:35:37 UTC (rev 30)
@@ -26,6 +26,10 @@
<menu name="Developer">
<item href="devel/draft.html" name="First draft"/>
</menu>
+
+ <menu name="diswork FS">
+ <item href="diswork-fs/history.html" name="Install"/>
+ </menu>
<menu ref="reports"/>
1
0
Author: bleny
Date: 2010-05-10 11:30:32 +0200 (Mon, 10 May 2010)
New Revision: 29
Url: http://nuiton.org/repositories/revision/diswork/29
Log:
menage, exceptions, conventions
Modified:
trunk/diswork-fs/pom.xml
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileRequestMessage.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileTransferMessage.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpMessage.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpResponseMessage.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Message.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsAddress.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsMessage.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java
trunk/diswork-fs/src/main/resources/log4j.properties
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
trunk/pom.xml
Modified: trunk/diswork-fs/pom.xml
===================================================================
--- trunk/diswork-fs/pom.xml 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/pom.xml 2010-05-10 09:30:32 UTC (rev 29)
@@ -18,42 +18,31 @@
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>1.2.14</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.9.0.GA</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
- <version>1.4</version>
- <type>jar</type>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
- <version>2.5</version>
- <type>jar</type>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-utils</artifactId>
- <version>1.2.2</version>
- <type>jar</type>
- <scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>fr.inria.peerunit</groupId>
+ <artifactId>PeerUnit</artifactId>
+ </dependency>
<!-- test -->
<dependency>
@@ -63,13 +52,6 @@
<type>jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.8.4</version>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
<plugins>
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,39 +1,53 @@
package org.nuiton.disworkfs;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
public interface DistributedFileSystem {
-
- /**
- * checks the existence of a file on the virtual FS
- * @param path a path to a file on the virtual FS
- * @return true if the file exists, false if not
- * @throws InterruptedException
- */
- public boolean exists(String path) throws InterruptedException;
-
- /**
- * write a file
- * @param path path of the virtual FS where the file should be written
- * @param source the file to copy on the VFS
- * @throws IOException if problems occurs while reading the source or writing on VFS
- * @throws InterruptedException
- */
- public void write(String path, File source) throws IOException, InterruptedException;
-
- /**
- * read a file on the VFS, the file is downloaded if needed (may take some times)
- * @param path the path in the virtual FS to the file you want to read
- * @return ??
- * @throws FileNotFoundException if no file have been written to this path (you may use {@link #exists(String) to check before read}
- * @throws IOException if a problem occur while reading the local file
- * @throws InterruptedException
- */
- public File read(String path) throws FileNotFoundException, IOException, InterruptedException;
-
- public void remove(String path);
- // TODO list a directory content
+ /**
+ * checks the existence of a file on the virtual FS
+ *
+ * @param path
+ * a path to a file on the virtual FS
+ * @return true if the file exists, false if not
+ * @throws InterruptedException
+ */
+ public boolean exists(String path) throws InterruptedException;
+
+ /**
+ * write a file
+ *
+ * @param path
+ * path of the virtual FS where the file should be written
+ * @param source
+ * the file to copy on the VFS
+ * @throws IOException
+ * if problems occurs while reading the source or writing on VFS
+ * @throws InterruptedException
+ */
+ public void write(String path, InputStream source) throws IOException,
+ InterruptedException;
+
+ /**
+ * read a file on the VFS, the file is downloaded if needed (may take some
+ * times)
+ *
+ * @param path
+ * the path in the virtual FS to the file you want to read
+ * @return ??
+ * @throws FileNotFoundException
+ * if no file have been written to this path (you may use
+ * {@link #exists(String) to check before read}
+ * @throws IOException
+ * if a problem occur while reading the local file
+ * @throws InterruptedException
+ */
+ public InputStream read(String path) throws FileNotFoundException,
+ IOException, InterruptedException;
+
+ public void remove(String path);
+
+ // TODO bleny 2010-05-10 list a directory content
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -7,19 +7,19 @@
public class DisworkConfig extends ApplicationConfig {
- public DisworkConfig() {
- Random random = new Random();
+ public DisworkConfig() {
+ Random random = new Random();
setDefaultOption("storage", "/tmp/disworkfs/storage" + random.nextInt());
setDefaultOption("jgroups.cluster_name", "diswork-fs");
-
+
// replication strategy...
- }
-
- public File getStoragePath() {
- return getOptionAsFile("storage");
- }
-
- public String getJGroupsClusterName() {
- return getOption("jgroups.cluster_name");
- }
+ }
+
+ public File getStoragePath() {
+ return getOptionAsFile("storage");
+ }
+
+ public String getJGroupsClusterName() {
+ return getOption("jgroups.cluster_name");
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,9 +1,15 @@
package org.nuiton.disworkfs;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.disworkfs.services.DisworkServicesManager;
@@ -13,99 +19,127 @@
import org.nuiton.disworkfs.split.FileDescription;
import org.nuiton.disworkfs.util.SimpleDownload;
import org.nuiton.disworkfs.util.SimpleLookUp;
-import org.nuiton.util.FileUtil;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
public class DisworkFileSystem implements DistributedFileSystem {
- protected DownloadService downloadService;
- protected UploadService uploadService;
- protected LookUpService lookUpService;
- protected DisworkConfig disworkConfig;
- protected DisworkServicesManager disworkServicesManager;
-
- private static final Log log = LogFactory.getLog(DisworkFileSystem.class);
-
- public DisworkFileSystem(DisworkConfig disworkConfig) {
+ protected DownloadService downloadService;
+ protected UploadService uploadService;
+ protected LookUpService lookUpService;
+ protected DisworkConfig disworkConfig;
+ protected DisworkServicesManager disworkServicesManager;
- this.disworkConfig = disworkConfig;
+ private static final Log log = LogFactory.getLog(DisworkFileSystem.class);
- disworkServicesManager = new DisworkServicesManager(disworkConfig);
- uploadService = new UploadService();
- disworkServicesManager.register(uploadService);
- downloadService = new DownloadService();
- disworkServicesManager.register(downloadService);
- lookUpService = new LookUpService();
- disworkServicesManager.register(lookUpService);
+ public DisworkFileSystem(DisworkConfig disworkConfig) {
- }
+ this.disworkConfig = disworkConfig;
- @Override
- public File read(String path) throws InterruptedException, FileNotFoundException {
-
- log.info("trying to read " + path);
-
- File file = new File(disworkConfig.getStoragePath(), path);
- log.info("trying at local file system " + file.getAbsolutePath());
+ disworkServicesManager = new DisworkServicesManager(disworkConfig);
+ uploadService = new UploadService();
+ disworkServicesManager.register(uploadService);
+ downloadService = new DownloadService();
+ disworkServicesManager.register(downloadService);
+ lookUpService = new LookUpService();
+ disworkServicesManager.register(lookUpService);
- if (!file.exists()) {
- // the file is not available
- // let's download it
+ }
- SimpleDownload simpleDownload = new SimpleDownload(path, lookUpService, downloadService);
- boolean fileFound = simpleDownload.initiateDownload();
- if (fileFound)
- simpleDownload.startDownload();
- else
- throw new FileNotFoundException("no look-up response received");
-
- }
-
- // FIXME
- return null;
- }
+ @Override
+ public InputStream read(String path) throws InterruptedException,
+ FileNotFoundException {
- @Override
- public void write(String path, File source) throws IOException, InterruptedException {
-
- File target = new File(disworkConfig.getStoragePath(), path);
-
- if (target.exists()) {
- throw new IOException(target.getAbsolutePath() + " already exists");
- }
-
- if (this.exists(path)) {
- throw new IOException(target.getAbsolutePath() + " already exists");
- }
-
- FileUtil.copy(source, target);
- }
+ log.info("trying to read " + path);
- @Override
- public boolean exists(String path) throws InterruptedException {
+ File file = new File(disworkConfig.getStoragePath(), path);
+ log.info("trying at local file system " + file.getAbsolutePath());
- File file = new File(disworkConfig.getStoragePath(), path);
- boolean fileExists = false;
-
- if (file.exists()) {
- fileExists = true;
- } else {
- SimpleLookUp simpleLookUp = new SimpleLookUp(path, lookUpService);
- FileDescription lookUpResult = simpleLookUp.runLookUp();
- fileExists = (lookUpResult != null);
- }
-
- return fileExists;
- }
+ if (!file.exists()) {
+ // the file is not available
+ // let's download it
- @Override
- public void remove(String path) {
- throw new NotImplementedException();
- }
+ SimpleDownload simpleDownload = new SimpleDownload(path,
+ lookUpService, downloadService);
+ boolean fileFound = simpleDownload.initiateDownload();
+ if (fileFound)
+ simpleDownload.startDownload();
+ else
+ throw new FileNotFoundException("no look-up response received");
- public void close() {
- disworkServicesManager.stop();
- }
-
+ }
+
+ return new BufferedInputStream(new FileInputStream(file));
+ }
+
+ @Override
+ public void write(String path, InputStream source) throws IOException,
+ InterruptedException {
+
+ File target = new File(disworkConfig.getStoragePath(), path);
+
+ // first, check if the already exists locally...
+ if (target.exists()) {
+ throw new IOException(target.getAbsolutePath() + " already exists");
+ }
+
+ // ... or on another node
+ if (this.exists(path)) {
+ throw new IOException(target.getAbsolutePath() + " already exists");
+ }
+
+ // the file does not exists, we can write it
+ // first, prepare all directories and file
+ target.getParentFile().mkdirs();
+ target.createNewFile();
+
+ // copying data from given source on the virtual FS
+ BufferedInputStream bufferedSource = null;
+ BufferedOutputStream bufferedTarget = null;
+
+ // FIXME bleny 2010-05-07 use buffered input stream ?
+ // FIXME bleny 2010-05-07 close 2 times when new... new ???
+
+ try {
+ bufferedSource = new BufferedInputStream(source);
+ bufferedTarget = new BufferedOutputStream(new FileOutputStream(
+ target));
+ IOUtils.copy(bufferedSource, bufferedTarget);
+ } finally {
+ IOUtils.closeQuietly(bufferedSource);
+ IOUtils.closeQuietly(bufferedTarget);
+ }
+
+ }
+
+ @Override
+ public boolean exists(String path) throws InterruptedException {
+
+ File file = new File(disworkConfig.getStoragePath(), path);
+ boolean fileExists = false;
+
+ // first, check file exists locally
+ if (file.exists()) {
+ // the file is stored on the local FS
+ fileExists = true;
+ } else {
+ // the file is not on the local FS,
+ // let's check on other nodes
+ SimpleLookUp simpleLookUp = new SimpleLookUp(path, lookUpService);
+ FileDescription lookUpResult = simpleLookUp.runLookUp();
+ fileExists = (lookUpResult != null);
+ }
+
+ return fileExists;
+ }
+
+ @Override
+ public void remove(String path) {
+ throw new NotImplementedException();
+ }
+
+ public void close() {
+ disworkServicesManager.stop();
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileRequestMessage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileRequestMessage.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileRequestMessage.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -5,17 +5,17 @@
import org.nuiton.disworkfs.split.FileDescription;
public class FileRequestMessage implements Serializable {
-
+
/**
*
*/
private static final long serialVersionUID = 7208155655482510721L;
private FileDescription fileDescription;
-
+
public FileRequestMessage(FileDescription fileDescription) {
this.fileDescription = fileDescription;
}
-
+
public FileDescription getfileDescription() {
return this.fileDescription;
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileTransferMessage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileTransferMessage.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/FileTransferMessage.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -6,7 +6,7 @@
import org.nuiton.disworkfs.split.FileDescription;
public class FileTransferMessage implements Serializable {
-
+
/**
*
*/
@@ -14,18 +14,18 @@
private FileChunk fileChunk;
private FileDescription fileDescription;
- public FileTransferMessage(FileChunk fileChunk, FileDescription fileDescription) {
- this.fileChunk = fileChunk;
- this.fileDescription = fileDescription;
- }
-
+ public FileTransferMessage(FileChunk fileChunk,
+ FileDescription fileDescription) {
+ this.fileChunk = fileChunk;
+ this.fileDescription = fileDescription;
+ }
+
public FileChunk getFileChunk() {
- return this.fileChunk;
- }
+ return this.fileChunk;
+ }
public FileDescription getFileDescrition() {
return this.fileDescription;
}
-
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpMessage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpMessage.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpMessage.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -3,17 +3,17 @@
import java.io.Serializable;
public class LookUpMessage implements Serializable {
-
+
/**
*
*/
private static final long serialVersionUID = -1538199939808485195L;
private String fileName;
-
+
public LookUpMessage(String fileName) {
this.fileName = fileName;
}
-
+
public String getFileName() {
return this.fileName;
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpResponseMessage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpResponseMessage.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/messages/LookUpResponseMessage.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -5,15 +5,15 @@
import org.nuiton.disworkfs.split.FileDescription;
public class LookUpResponseMessage implements Serializable {
-
+
private static final long serialVersionUID = -8880348876968678210L;
-
+
private FileDescription fileDescription;
-
+
public LookUpResponseMessage(FileDescription fileDescription) {
this.fileDescription = fileDescription;
}
-
+
public FileDescription getFileDescription() {
return this.fileDescription;
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -17,56 +17,66 @@
public abstract class AbstractDisworkService implements Receiver, Runnable {
// protected JChannel jChannel;
-
- protected Transport transport;
-
+
+ protected Transport transport;
+
protected DisworkConfig disworkConfig;
-
- private static final Log log = LogFactory.getLog(AbstractDisworkService.class);
- public AbstractDisworkService() {}
-
- public AbstractDisworkService(Transport transport) {
- this.transport = transport;
- }
-
- @Override
- public void receiveMessage(Message message) {
+ private static final Log log = LogFactory
+ .getLog(AbstractDisworkService.class);
+
+ public AbstractDisworkService() {
+ }
+
+ public AbstractDisworkService(Transport transport) {
+ this.transport = transport;
+ }
+
+ @Override
+ public void receiveMessage(Message message) {
try {
- Serializable obj = message.getContent();
- if (obj instanceof LookUpMessage) {
- receiveLookUpMessage(message);
- } else if (obj instanceof LookUpResponseMessage) {
- receiveLookUpResponseMessage(message);
- } else if (obj instanceof FileRequestMessage) {
- receiveFileRequestMessage(message);
- } else if (obj instanceof FileTransferMessage) {
- receiveFileTransferMessage(message);
- } else {
- log.error("unknow message received");
- }
- } catch (IOException eee) {
- log.error("IOException encoutered after receiving a message", eee);
- }
- }
-
- public void receiveLookUpMessage(Message msg) throws IOException {}
- public void receiveLookUpResponseMessage(Message msg) {}
- public void receiveFileRequestMessage(Message msg) throws IOException {}
- public void receiveFileTransferMessage(Message msg) throws IOException {}
-
+ Serializable obj = message.getContent();
+ if (obj instanceof LookUpMessage) {
+ receiveLookUpMessage(message);
+ } else if (obj instanceof LookUpResponseMessage) {
+ receiveLookUpResponseMessage(message);
+ } else if (obj instanceof FileRequestMessage) {
+ receiveFileRequestMessage(message);
+ } else if (obj instanceof FileTransferMessage) {
+ receiveFileTransferMessage(message);
+ } else {
+ log.error("unknow message received");
+ }
+ } catch (IOException eee) {
+ log.error("IOException encoutered after receiving a message", eee);
+ }
+ }
+
+ public void receiveLookUpMessage(Message msg) throws IOException {
+ }
+
+ public void receiveLookUpResponseMessage(Message msg) {
+ }
+
+ public void receiveFileRequestMessage(Message msg) throws IOException {
+ }
+
+ public void receiveFileTransferMessage(Message msg) throws IOException {
+ }
+
public void setTransport(Transport transport) {
- this.transport = transport;
+ this.transport = transport;
}
- public void setDisworkConfig(DisworkConfig disworkConfig) {
- this.disworkConfig = disworkConfig;
- }
+ public void setDisworkConfig(DisworkConfig disworkConfig) {
+ this.disworkConfig = disworkConfig;
+ }
- @Override
- public void run() {}
-
- public void stop() {}
-
-
+ @Override
+ public void run() {
+ }
+
+ public void stop() {
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -10,61 +10,59 @@
import org.nuiton.disworkfs.transport.jgroups.JGroupsTransport;
public class DisworkServicesManager implements Receiver {
-
+
/**
* all the running services
*/
private List<AbstractDisworkService> services = new ArrayList<AbstractDisworkService>();
-
+
protected Transport transport;
-
+
private DisworkConfig disworkConfig;
-
+
public DisworkServicesManager(DisworkConfig disworkConfig) {
- this.disworkConfig = disworkConfig;
-
- transport = new JGroupsTransport(disworkConfig);
-
- transport.setReceiver(this);
-
+ this.disworkConfig = disworkConfig;
+
+ transport = new JGroupsTransport(disworkConfig);
+
+ transport.setReceiver(this);
+
}
-
+
public void register(AbstractDisworkService service) {
-
- // dependency injection, the service need a channel to send a message
- // service.setJChannel(jChannel);
- service.setTransport(transport);
- // ... and the disworkConfig
- service.setDisworkConfig(disworkConfig);
-
- // each service is run in his own thread
- Thread thread = new Thread(service);
- thread.start();
-
- // add this service to the list of running services
+
+ // dependency injection, the service need to send messages
+ service.setTransport(transport);
+ // ... and the disworkConfig
+ service.setDisworkConfig(disworkConfig);
+
+ // each service is run in his own thread
+ Thread thread = new Thread(service);
+ thread.start();
+
+ // add this service to the list of running services
services.add(service);
}
-
+
public void unRegister(AbstractDisworkService service) {
services.remove(service);
}
-
- @Override
- public void receiveMessage(Message message) {
+ @Override
+ public void receiveMessage(Message message) {
for (AbstractDisworkService service : services) {
service.receiveMessage(message);
}
- }
+ }
- /**
- * this stop all the diswork services and close the
- * ressources used by the transport layer
- */
- public void stop() {
+ /**
+ * this stop all the diswork services and close the ressources used by the
+ * transport layer
+ */
+ public void stop() {
for (AbstractDisworkService service : services) {
service.stop();
}
transport.close();
- }
+ }
}
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -18,109 +18,118 @@
public class DownloadService extends AbstractDisworkService {
- private Map<Long, SplitFileFromChunks> downloadsInProgress = new HashMap<Long, SplitFileFromChunks>();
+ private Map<Long, SplitFileFromChunks> downloadsInProgress = new HashMap<Long, SplitFileFromChunks>();
- private List<Long> finishedDownloads = new LinkedList<Long>();
+ private List<Long> finishedDownloads = new LinkedList<Long>();
- private Map<Long, List<DownloadObserver>> downloadObservers = new HashMap<Long, List<DownloadObserver>>();
+ private Map<Long, List<DownloadObserver>> downloadObservers = new HashMap<Long, List<DownloadObserver>>();
- private static final Log log = LogFactory.getLog(DownloadService.class);
+ private static final Log log = LogFactory.getLog(DownloadService.class);
- @Override
- public void receiveFileTransferMessage(Message message) throws IOException {
+ @Override
+ public void receiveFileTransferMessage(Message message) throws IOException {
- FileTransferMessage fileTransferMessage = (FileTransferMessage) message.getContent();
- FileDescription fileDescription = fileTransferMessage.getFileDescrition();
-
- if (log.isDebugEnabled()) {
- log.info("received file chunk "
- + fileTransferMessage.getFileDescrition().getFileName()
- + " chunk number "
- + fileTransferMessage.getFileChunk().getChunkNumber()
- + ")");
- }
+ FileTransferMessage fileTransferMessage = (FileTransferMessage) message
+ .getContent();
+ FileDescription fileDescription = fileTransferMessage
+ .getFileDescrition();
- // Before do anything, check if we already have the complete file
+ if (log.isDebugEnabled()) {
+ log
+ .info("received file chunk "
+ + fileTransferMessage.getFileDescrition()
+ .getFileName()
+ + " chunk number "
+ + fileTransferMessage.getFileChunk()
+ .getChunkNumber() + ")");
+ }
- File newFile = new File(disworkConfig.getStoragePath(), fileDescription.getFileName());
- File newFileStatus = new File(disworkConfig.getStoragePath(), "." + fileDescription.getFileName() + ".index");
-
- // file must exists, have the expected size and index should not exist
- // TODO : use checksum ?
- boolean alreadyHaveFile = newFile.exists()
- && (newFile.length() == fileDescription.getTotalSize())
- && !newFileStatus.exists();
-
- if (!alreadyHaveFile) {
- Long checkSum = fileDescription.getFileCheckSum();
+ // ignore message if already have finished the download
+ if (!finishedDownloads.contains(fileDescription.getFileCheckSum())) {
+ Long checkSum = fileDescription.getFileCheckSum();
- if (! downloadsInProgress.containsKey(checkSum)) {
- // Start download
+ // this is the actual file we will write
+ File newFile = new File(disworkConfig.getStoragePath(),
+ fileDescription.getFileName());
- log.info("first chunk received, initiate download");
+ if (!downloadsInProgress.containsKey(checkSum)) {
+ // Start download
- SplitFileFromChunks newSplitFile = new SplitFileFromChunks(fileDescription, newFile);
+ log.info("first chunk received, initiate download");
- downloadsInProgress.put(checkSum, newSplitFile);
- }
+ SplitFileFromChunks newSplitFile = new SplitFileFromChunks(
+ fileDescription, newFile);
- // we have received a file chunk, let's add it
- SplitFileFromChunks downloadingFile = downloadsInProgress.get(checkSum);
+ downloadsInProgress.put(checkSum, newSplitFile);
+ }
- downloadingFile.addChunk(fileTransferMessage.getFileChunk());
-
- // maybe the download is complete
- if (downloadingFile.isComplete()) {
- log.info("file " + newFile.getAbsolutePath() + " written");
+ // we have received a file chunk, let's add it
+ SplitFileFromChunks downloadingFile = downloadsInProgress
+ .get(checkSum);
- finishedDownloads.add(checkSum);
- downloadsInProgress.remove(checkSum);
+ log.info("adding chunk "
+ + fileTransferMessage.getFileChunk().getChunkNumber());
- this.notifyAllDownloadObserversForFile(checkSum);
- }
- }
- }
+ downloadingFile.addChunk(fileTransferMessage.getFileChunk());
- public void registerObserver(FileDescription fileDescription, DownloadObserver downloadObserver) {
- List<DownloadObserver> observersList = downloadObservers.get(fileDescription.getFileCheckSum());
+ // maybe the download is complete
+ if (downloadingFile.isComplete()) {
+ log.info("file " + newFile.getAbsolutePath() + " written");
- if (observersList == null) {
- // it's the first observer for this download ever, let's construct a list
- observersList = new LinkedList<DownloadObserver>();
- }
-
- observersList.add(downloadObserver);
- downloadObservers.put(fileDescription.getFileCheckSum(), observersList);
- }
+ finishedDownloads.add(checkSum);
+ downloadsInProgress.remove(checkSum);
- private void notifyAllDownloadObserversForFile(Long checksum) {
- List<DownloadObserver> downloadObserversForThisFile = downloadObservers.get(checksum);
+ this.notifyAllDownloadObserversForFile(checkSum);
+ }
+ }
+ }
- // important check : maybe no observer for this file download
- // so the list is null.
- if (downloadObserversForThisFile != null) {
+ public void registerObserver(FileDescription fileDescription,
+ DownloadObserver downloadObserver) {
+ List<DownloadObserver> observersList = downloadObservers
+ .get(fileDescription.getFileCheckSum());
- for (DownloadObserver downloadObserver : downloadObserversForThisFile) {
- downloadObserver.updateDownloadStatus(this);
- }
- }
- }
+ if (observersList == null) {
+ // it's the first observer for this download ever, let's construct a
+ // list
+ observersList = new LinkedList<DownloadObserver>();
+ }
- public boolean isFinished(FileDescription fileDescription) {
- boolean finished = finishedDownloads.contains(fileDescription.getFileCheckSum());
- return finished;
- }
+ observersList.add(downloadObserver);
+ downloadObservers.put(fileDescription.getFileCheckSum(), observersList);
+ }
+ private void notifyAllDownloadObserversForFile(Long checksum) {
+ List<DownloadObserver> downloadObserversForThisFile = downloadObservers
+ .get(checksum);
- public void startDownload(FileDescription fileDescription, DownloadObserver downloadObserver) {
- Message message = transport.newMulticastMessage();
- message.setContent(new FileRequestMessage(fileDescription));
- message.send();
-
- if (log.isDebugEnabled())
- log.info("sending file request for " + fileDescription.getFileName());
-
- registerObserver(fileDescription, downloadObserver);
-
- }
+ // important check : maybe no observer for this file download
+ // so the list is null.
+ if (downloadObserversForThisFile != null) {
+
+ for (DownloadObserver downloadObserver : downloadObserversForThisFile) {
+ downloadObserver.updateDownloadStatus(this);
+ }
+ }
+ }
+
+ public boolean isFinished(FileDescription fileDescription) {
+ boolean finished = finishedDownloads.contains(fileDescription
+ .getFileCheckSum());
+ return finished;
+ }
+
+ public void startDownload(FileDescription fileDescription,
+ DownloadObserver downloadObserver) {
+ Message message = transport.newMulticastMessage();
+ message.setContent(new FileRequestMessage(fileDescription));
+ message.send();
+
+ if (log.isDebugEnabled())
+ log.info("sending file request for "
+ + fileDescription.getFileName());
+
+ registerObserver(fileDescription, downloadObserver);
+
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,6 +1,5 @@
package org.nuiton.disworkfs.services;
-
import java.util.HashMap;
import java.util.Map;
@@ -11,49 +10,53 @@
import org.nuiton.disworkfs.transport.Message;
import org.nuiton.disworkfs.util.LookUpObserver;
+public class LookUpService extends AbstractDisworkService {
+ /**
+ * this map store, for each request, the LookUpObserver to notify when the
+ * LookUpResponse is received
+ */
+ Map<String, LookUpObserver> requestToRequester = new HashMap<String, LookUpObserver>();
-public class LookUpService extends AbstractDisworkService {
-
- /**
- * this map store, for each request, the LookUpObserver to notify when
- * the LookUpResponse is received
- */
- Map<String, LookUpObserver> requestToRequester = new HashMap<String, LookUpObserver>();
-
private static final Log log = LogFactory.getLog(LookUpService.class);
/**
*
- * @param fileName the name of the file to search for
- * @param lookUpObserver the object to notify when LookUpResponse have been sent back
+ * @param fileName
+ * the name of the file to search for
+ * @param lookUpObserver
+ * the object to notify when LookUpResponse have been sent back
*/
- public void lookForFileName(String fileName, LookUpObserver lookUpObserver) {
-
- LookUpMessage lookUpMessage = new LookUpMessage(fileName);
-
- Message message = transport.newMulticastMessage();
- message.setContent(lookUpMessage);
- message.send();
- log.info("look-up message sent for " + fileName);
-
- // FIXME memory leak if multiple look-up on never-will-be-available files
- requestToRequester.put(fileName, lookUpObserver);
- }
+ public void lookForFileName(String fileName, LookUpObserver lookUpObserver) {
- @Override
- public void receiveLookUpResponseMessage(Message message) {
+ LookUpMessage lookUpMessage = new LookUpMessage(fileName);
- LookUpResponseMessage lookUpResponse = (LookUpResponseMessage) message.getContent();
+ Message message = transport.newMulticastMessage();
+ message.setContent(lookUpMessage);
+ message.send();
+ log.info("look-up message sent for " + fileName);
- String requestedFileName = lookUpResponse.getFileDescription().getFileName();
+ // FIXME bleny 2010-05-07 memory leak if multiple look-up on
+ // never-will-be-available files
+ requestToRequester.put(fileName, lookUpObserver);
+ }
- if (requestToRequester.containsKey(requestedFileName)) {
- LookUpObserver requester = requestToRequester.get(requestedFileName);
- requester.receiveResult(lookUpResponse.getFileDescription());
- requestToRequester.remove(requestedFileName);
- }
+ @Override
+ public void receiveLookUpResponseMessage(Message message) {
- }
+ LookUpResponseMessage lookUpResponse = (LookUpResponseMessage) message
+ .getContent();
+ String requestedFileName = lookUpResponse.getFileDescription()
+ .getFileName();
+
+ if (requestToRequester.containsKey(requestedFileName)) {
+ LookUpObserver requester = requestToRequester
+ .get(requestedFileName);
+ requester.receiveResult(lookUpResponse.getFileDescription());
+ requestToRequester.remove(requestedFileName);
+ }
+
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -16,89 +16,106 @@
import org.nuiton.disworkfs.transport.Message;
/**
- * When receiving a FileRequest, sending the file
- * When receiving a LookUp, sending a LookUpResponse if owning file on local FS
+ * When receiving a FileRequest, sending the file When receiving a LookUp,
+ * sending a LookUpResponse if owning file on local FS
*/
public class UploadService extends AbstractDisworkService {
- private static final Log log = LogFactory.getLog(UploadService.class);
+ private static final Log log = LogFactory.getLog(UploadService.class);
- @Override
- public void receiveFileRequestMessage(Message message) throws IOException {
- FileRequestMessage fileRequestMessage = (FileRequestMessage) message.getContent();
+ @Override
+ public void receiveFileRequestMessage(Message message) throws IOException {
+ FileRequestMessage fileRequestMessage = (FileRequestMessage) message
+ .getContent();
- File file = new File(disworkConfig.getStoragePath(), fileRequestMessage.getfileDescription().getFileName());
+ File file = new File(disworkConfig.getStoragePath(), fileRequestMessage
+ .getfileDescription().getFileName());
- if (log.isDebugEnabled())
- log.info("file request message received for " + fileRequestMessage.getfileDescription().getFileName());
+ if (log.isDebugEnabled())
+ log.info("file request message received for "
+ + fileRequestMessage.getfileDescription().getFileName());
- if (file.exists()) {
+ if (file.exists()) {
- if (log.isDebugEnabled())
- log.info("file found in path " + file.getAbsolutePath());
+ if (log.isDebugEnabled())
+ log.info("file found in path " + file.getAbsolutePath());
- SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
+ SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(
+ file);
- FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
- fileDescription.setFileName(fileRequestMessage.getfileDescription().getFileName());
+ FileDescription fileDescription = splitFileFromLocalFile
+ .getFileDescription();
+ fileDescription.setFileName(fileRequestMessage.getfileDescription()
+ .getFileName());
- List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks();
+ List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks();
- for (FileChunk fileChunk : chunks) {
- Message reply = message.newReply();
+ log.info("will send " + chunks.size() + " chunks");
- FileTransferMessage fileTransferMessage = new FileTransferMessage(fileChunk, fileDescription);
+ for (FileChunk fileChunk : chunks) {
+ Message reply = message.newReply();
- reply.setContent(fileTransferMessage);
+ FileTransferMessage fileTransferMessage = new FileTransferMessage(
+ fileChunk, fileDescription);
- if (log.isDebugEnabled())
- log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber());
+ reply.setContent(fileTransferMessage);
- reply.send();
+ // if (true || log.isDebugEnabled())
+ log.info("sending chunk "
+ + fileTransferMessage.getFileChunk().getChunkNumber());
- }
+ reply.send();
- if (log.isDebugEnabled())
- log.info("all chunks sent");
- } else {
- if (log.isDebugEnabled())
- log.info("file not found in path " + file.getAbsolutePath() + " don't send response");
- }
+ }
- }
+ if (log.isDebugEnabled())
+ log.info("all chunks sent");
+ } else {
+ if (log.isDebugEnabled())
+ log.info("file not found in path " + file.getAbsolutePath()
+ + " don't send response");
+ }
- @Override
- public void receiveLookUpMessage(Message message) throws IOException {
- LookUpMessage lookUpMessage = (LookUpMessage) message.getContent();
+ }
- if (log.isDebugEnabled())
- log.info("lookup message received : looking for file " + lookUpMessage.getFileName());
+ @Override
+ public void receiveLookUpMessage(Message message) throws IOException {
+ LookUpMessage lookUpMessage = (LookUpMessage) message.getContent();
- File file = new File(disworkConfig.getStoragePath(), lookUpMessage.getFileName());
+ if (log.isDebugEnabled())
+ log.info("lookup message received : looking for file "
+ + lookUpMessage.getFileName());
- if (file.exists()) {
- if (log.isDebugEnabled())
- log.info(lookUpMessage.getFileName() + " file found at " + file.getAbsolutePath());
+ File file = new File(disworkConfig.getStoragePath(), lookUpMessage
+ .getFileName());
- SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
+ if (file.exists()) {
+ if (log.isDebugEnabled())
+ log.info(lookUpMessage.getFileName() + " file found at "
+ + file.getAbsolutePath());
+ SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(
+ file);
- Message reply = message.newReply();
+ Message reply = message.newReply();
- FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
+ FileDescription fileDescription = splitFileFromLocalFile
+ .getFileDescription();
- fileDescription.setFileName(lookUpMessage.getFileName());
+ fileDescription.setFileName(lookUpMessage.getFileName());
- reply.setContent(new LookUpResponseMessage(fileDescription));
+ reply.setContent(new LookUpResponseMessage(fileDescription));
- if (log.isDebugEnabled())
- log.info("sending lookUpResponse response for " + lookUpMessage.getFileName());
+ if (log.isDebugEnabled())
+ log.info("sending lookUpResponse response for "
+ + lookUpMessage.getFileName());
- reply.send();
- } else {
- if (log.isDebugEnabled())
- log.info(lookUpMessage.getFileName() + " file not found at " + file.getAbsolutePath() + " no response sent");
- }
+ reply.send();
+ } else {
+ if (log.isDebugEnabled())
+ log.info(lookUpMessage.getFileName() + " file not found at "
+ + file.getAbsolutePath() + " no response sent");
+ }
- }
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -4,54 +4,54 @@
public class FileChunk implements Serializable {
- private static final long serialVersionUID = 9081850894548724600L;
+ private static final long serialVersionUID = 9081850894548724600L;
- // FIXME dangerous : not final !
- public static int MAX_CHUNK_SIZE = 1024;
+ // FIXME bleny 2010-05-10 dangerous : not final !
+ public static int MAX_CHUNK_SIZE = 1024;
- private byte[] data = new byte[MAX_CHUNK_SIZE];
+ private byte[] data = new byte[MAX_CHUNK_SIZE];
- private int chunkNumber;
+ private int chunkNumber;
- private int chunkSize;
+ private int chunkSize;
- public FileChunk(byte[] data) {
- this.setData(data);
- }
+ public FileChunk(byte[] data) {
+ this.setData(data);
+ }
- public byte[] getData() {
- return this.data;
- }
+ public byte[] getData() {
+ return this.data;
+ }
- public void setData(byte[] data) {
- this.data = data;
- }
+ public void setData(byte[] data) {
+ this.data = data;
+ }
- public int getChunkNumber() {
- return this.chunkNumber;
- }
+ public int getChunkNumber() {
+ return this.chunkNumber;
+ }
- public void setChunkNumber(int chunkNumber) {
- this.chunkNumber = chunkNumber;
- }
+ public void setChunkNumber(int chunkNumber) {
+ this.chunkNumber = chunkNumber;
+ }
- public int getChunkSize() {
- return this.chunkSize;
- }
+ public int getChunkSize() {
+ return this.chunkSize;
+ }
- public void setChunkSize(int chunkSize) {
- this.chunkSize = chunkSize;
- }
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
- public static int numberOfChunksNeededtoStore(long numberOfBytes) {
- long numberOfChunks = numberOfBytes / MAX_CHUNK_SIZE;
+ public static int numberOfChunksNeededtoStore(long numberOfBytes) {
+ long numberOfChunks = numberOfBytes / MAX_CHUNK_SIZE;
- if (numberOfBytes > numberOfChunks * MAX_CHUNK_SIZE) {
- numberOfChunks += 1;
- }
+ if (numberOfBytes > numberOfChunks * MAX_CHUNK_SIZE) {
+ numberOfChunks += 1;
+ }
- // FIXME unsafe cast
- return (int) numberOfChunks;
- }
+ // FIXME bleny 2010-05-10 unsafe cast
+ return (int) numberOfChunks;
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -3,43 +3,37 @@
import java.io.Serializable;
public class FileDescription implements Serializable {
-
+
private static final long serialVersionUID = 1809101246917954092L;
private String fileName;
- private long totalSize;
+ private long totalSize;
private long fileCheckSum;
+ public FileDescription(String fileName, long totalSize, long fileCheckSum) {
+ super();
+ this.fileName = fileName;
+ this.totalSize = totalSize;
+ this.fileCheckSum = fileCheckSum;
+ }
- public FileDescription(String fileName, long totalSize, long fileCheckSum) {
- super();
- this.fileName = fileName;
- this.totalSize = totalSize;
- this.fileCheckSum = fileCheckSum;
- }
+ public String getFileName() {
+ return fileName;
+ }
+ public long getTotalSize() {
+ return totalSize;
+ }
- public String getFileName() {
- return fileName;
- }
+ public long getFileCheckSum() {
+ return fileCheckSum;
+ }
+ public int getNumberOfChunks() {
+ return FileChunk.numberOfChunksNeededtoStore(this.totalSize);
+ }
- public long getTotalSize() {
- return totalSize;
- }
-
-
- public long getFileCheckSum() {
- return fileCheckSum;
- }
-
-
- public int getNumberOfChunks() {
- return FileChunk.numberOfChunksNeededtoStore(this.totalSize);
- }
-
public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
-
+ this.fileName = fileName;
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -17,90 +17,108 @@
public class SplitFileFromChunks {
- protected FileDescription fileDescription;
- protected File destination;
- protected File chunkStatusFile;
-
- private Log log = LogFactory.getLog(SplitFileFromChunks.class);
+ protected FileDescription fileDescription;
+ protected File destination;
+ protected File chunkStatusFile;
- public SplitFileFromChunks(FileDescription fileDescription, File destination) throws IOException {
- this.fileDescription = fileDescription;
- this.destination = destination;
- this.chunkStatusFile = new File(destination.getParent(), "." + destination.getName() + ".index");
+ private Log log = LogFactory.getLog(SplitFileFromChunks.class);
- if (!destination.exists()) {
- try {
- RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw");
- randomAccessFile.setLength(fileDescription.getTotalSize());
- randomAccessFile.close();
- } catch (FileNotFoundException e) {
- // we just checked !destination.exists()
- }
- }
-
- if (!chunkStatusFile.exists()) {
- // force the creation of all needed directories
- log.debug("creating directory " + chunkStatusFile.getParentFile());
- chunkStatusFile.getParentFile().mkdirs();
- BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks());
- bitSet.set(0, fileDescription.getNumberOfChunks());
- writeChunkStatusFile(bitSet);
- bitSet = readChunkStatusFile();
- }
+ public SplitFileFromChunks(FileDescription fileDescription, File destination)
+ throws IOException {
+ this.fileDescription = fileDescription;
+ this.destination = destination;
+ this.chunkStatusFile = new File(destination.getParent(), "."
+ + destination.getName() + ".index");
- }
+ // force the creation of all needed directories
- public boolean isComplete() throws IOException {
- BitSet bitSet = readChunkStatusFile();
- boolean fileIsComplete = bitSet.cardinality() == 0;
- return fileIsComplete;
- }
+ File directory = destination.getParentFile();
+ if (!directory.exists()) {
+ log.debug("creating directory " + directory.getAbsolutePath());
+ directory.mkdirs();
+ }
+ if (!destination.exists()) {
+ try {
+ RandomAccessFile randomAccessFile = new RandomAccessFile(
+ destination, "rw");
+ randomAccessFile.setLength(fileDescription.getTotalSize());
+ randomAccessFile.close();
+ } catch (FileNotFoundException e) {
+ // we just checked !destination.exists()
+ }
+ }
- protected BitSet readChunkStatusFile() throws IOException {
- BitSet bitSet = null;
- try {
- FileInputStream is = new FileInputStream(chunkStatusFile);
- bitSet = (BitSet) new ObjectInputStream(is).readObject();
- is.close();
- } catch (ClassNotFoundException e) {
- throw new IOException("Unknown Metadata", e);
- }
- return bitSet;
- }
+ if (!chunkStatusFile.exists()) {
+ BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks());
+ bitSet.set(0, fileDescription.getNumberOfChunks());
+ writeChunkStatusFile(bitSet);
+ // bitSet = readChunkStatusFile();
+ }
+ }
- protected void writeChunkStatusFile(BitSet bitSet) throws IOException {
- FileOutputStream os = new FileOutputStream(chunkStatusFile);
- new ObjectOutputStream(os).writeObject(bitSet);
- os.close();
- }
+ public boolean isComplete() throws IOException {
+ boolean fileIsComplete = false;
+ if (chunkStatusFile.exists()) {
+ BitSet bitSet = readChunkStatusFile();
+ fileIsComplete = bitSet.cardinality() == 0;
+ } else {
+ fileIsComplete = true;
+ }
+ return fileIsComplete;
+ }
+ protected BitSet readChunkStatusFile() throws IOException {
+ BitSet bitSet = null;
+ try {
+ FileInputStream is = new FileInputStream(chunkStatusFile);
+ bitSet = (BitSet) new ObjectInputStream(is).readObject();
+ is.close();
+ } catch (ClassNotFoundException e) {
+ log.error("Metadata file corrupted, BitSet expected", e);
+ throw new IOException("Unknown Metadata", e);
+ }
+ return bitSet;
+ }
- public void addChunk(FileChunk fileChunk) throws IOException {
- RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw");
- byte[] data = fileChunk.getData();
- int off = FileChunk.MAX_CHUNK_SIZE * fileChunk.getChunkNumber();
- int len = fileChunk.getChunkSize();
- randomAccessFile.seek(off);
- randomAccessFile.write(data, 0, len);
- randomAccessFile.close();
+ protected void writeChunkStatusFile(BitSet bitSet) throws IOException {
+ FileOutputStream os = new FileOutputStream(chunkStatusFile);
+ new ObjectOutputStream(os).writeObject(bitSet);
+ os.close();
+ }
- // updating status
- BitSet chunkStatus = readChunkStatusFile();
- chunkStatus.clear(fileChunk.getChunkNumber());
- writeChunkStatusFile(chunkStatus);
+ public void addChunk(FileChunk fileChunk) throws IOException {
+ RandomAccessFile randomAccessFile = new RandomAccessFile(destination,
+ "rw");
+ byte[] data = fileChunk.getData();
+ int off = FileChunk.MAX_CHUNK_SIZE * fileChunk.getChunkNumber();
+ int len = fileChunk.getChunkSize();
+ randomAccessFile.seek(off);
+ randomAccessFile.write(data, 0, len);
+ randomAccessFile.close();
+ // updating status
+ BitSet chunkStatus = readChunkStatusFile();
+ chunkStatus.clear(fileChunk.getChunkNumber());
+ writeChunkStatusFile(chunkStatus);
- if (isComplete()) {
+ if (isComplete()) {
- long expectedChecksum = fileDescription.getFileCheckSum();
- long actualCheckSum = FileUtils.checksum(destination, new CRC32()).getValue();
+ long expectedChecksum = fileDescription.getFileCheckSum();
+ long actualCheckSum = FileUtils.checksum(destination, new CRC32())
+ .getValue();
- if (actualCheckSum != expectedChecksum) {
- throw new IOException("checksum fail");
- }
- }
- }
+ if (actualCheckSum != expectedChecksum) {
+ throw new IOException("checksum fail");
+ }
+ chunkStatusFile.delete();
+ }
+ }
+ /*
+ * public boolean isFinished() { boolean finished = !
+ * chunkStatusFile.exists(); return finished; }
+ */
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -12,55 +12,57 @@
public class SplitFileFromLocalFile {
- protected File source;
-
- public SplitFileFromLocalFile(File source) {
- this.source = source;
- }
+ protected File source;
- public FileDescription getFileDescription() throws IOException {
- String localFileName = source.getPath();
- long localFileTotalSize = source.length();
- long localFileCheckSum = FileUtils.checksum(source, new CRC32()).getValue();
- return new FileDescription(localFileName, localFileTotalSize, localFileCheckSum);
- }
+ public SplitFileFromLocalFile(File source) {
+ this.source = source;
+ }
- public List<FileChunk> getAllChunks() throws IOException, FileNotFoundException {
+ public FileDescription getFileDescription() throws IOException {
+ String localFileName = source.getPath();
+ long localFileTotalSize = source.length();
+ long localFileCheckSum = FileUtils.checksum(source, new CRC32())
+ .getValue();
+ return new FileDescription(localFileName, localFileTotalSize,
+ localFileCheckSum);
+ }
- RandomAccessFile randomAccessFile = new RandomAccessFile(source, "r");
+ public List<FileChunk> getAllChunks() throws IOException,
+ FileNotFoundException {
- // preparing an empty list to store the result
- List<FileChunk> result = new ArrayList<FileChunk>();
-
- // this array will contains some bytes read from the file
- byte[] read = new byte[FileChunk.MAX_CHUNK_SIZE];
+ RandomAccessFile randomAccessFile = new RandomAccessFile(source, "r");
- // chunks have to be numbered
- int chunkNumber = 0;
-
- // the last chunk will not be complete, so we have to store the
- // chunkSize for each chunk
- int chunkSize;
+ // preparing an empty list to store the result
+ List<FileChunk> result = new ArrayList<FileChunk>();
- // reading the file until the end
- while ((chunkSize = randomAccessFile.read(read)) != -1) {
+ // this array will contains some bytes read from the file
+ byte[] read = new byte[FileChunk.MAX_CHUNK_SIZE];
- // creating a FileChunk from the data read
- FileChunk fileChunk = new FileChunk(read);
- fileChunk.setChunkNumber(chunkNumber);
- fileChunk.setChunkSize(chunkSize);
-
- result.add(fileChunk);
-
- // preparing data for next iteration
- read = new byte[FileChunk.MAX_CHUNK_SIZE];
- chunkNumber += 1;
- }
+ // chunks have to be numbered
+ int chunkNumber = 0;
- randomAccessFile.close();
-
- return result;
- }
+ // the last chunk will not be complete, so we have to store the
+ // chunkSize for each chunk
+ int chunkSize;
+ // reading the file until the end
+ while ((chunkSize = randomAccessFile.read(read)) != -1) {
+ // creating a FileChunk from the data read
+ FileChunk fileChunk = new FileChunk(read);
+ fileChunk.setChunkNumber(chunkNumber);
+ fileChunk.setChunkSize(chunkSize);
+
+ result.add(fileChunk);
+
+ // preparing data for next iteration
+ read = new byte[FileChunk.MAX_CHUNK_SIZE];
+ chunkNumber += 1;
+ }
+
+ randomAccessFile.close();
+
+ return result;
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -4,9 +4,9 @@
public abstract class Address implements Serializable {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 4178387970911345672L;
-
+ private static final long serialVersionUID = 4178387970911345672L;
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Message.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Message.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Message.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -2,38 +2,43 @@
import java.io.Serializable;
-
/**
* Use Transport as a factory to create new instances of this class
+ *
* @author bleny
- *
+ *
*/
public abstract class Message {
-
- protected Transport transport;
-
- /**
- * contructor is not available as public. Use {@link Transport#newEmptyMessage()}
- * if you want to create a new message
- * @param transport
- */
- protected Message(Transport transport) {
- // dependency injection done by the newEmptyMessage factory
- this.transport = transport;
- }
- public abstract void setSource(Address source);
- public abstract void setDestination(Address destination);
- public abstract void setContent(Serializable content);
+ protected Transport transport;
- public abstract Address getSource();
- public abstract Address getDestination();
- public abstract Serializable getContent();
-
- public void send() {
- this.transport.send(this);
- }
-
- public abstract Message newReply();
-
+ /**
+ * contructor is not available as public. Use
+ * {@link Transport#newEmptyMessage()} if you want to create a new message
+ *
+ * @param transport
+ */
+ protected Message(Transport transport) {
+ // dependency injection done by the newEmptyMessage factory
+ this.transport = transport;
+ }
+
+ public abstract void setSource(Address source);
+
+ public abstract void setDestination(Address destination);
+
+ public abstract void setContent(Serializable content);
+
+ public abstract Address getSource();
+
+ public abstract Address getDestination();
+
+ public abstract Serializable getContent();
+
+ public void send() {
+ this.transport.send(this);
+ }
+
+ public abstract Message newReply();
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,8 +1,7 @@
package org.nuiton.disworkfs.transport;
-
public interface Receiver {
- public void receiveMessage(Message message);
-
+ public void receiveMessage(Message message);
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,49 +1,51 @@
package org.nuiton.disworkfs.transport;
public interface Transport {
-
- /**
- * @return true if the message have been sent without error
- */
- public boolean send(Message message);
- /**
- * Factory method
- * @return a new empty message, ready to be sent
- */
- public Message newEmptyMessage();
+ /**
+ * @return true if the message have been sent without error
+ */
+ public boolean send(Message message);
- /**
- * Factory method
- * @return a new empty message, ready to be sent to all nodes
- */
- public Message newMulticastMessage();
+ /**
+ * Factory method
+ *
+ * @return a new empty message, ready to be sent
+ */
+ public Message newEmptyMessage();
- /**
- * Use this method to set the receiver for this transport
- * every message received by this transport will be sent
- * to this object
- * @param receiver
- */
- public void setReceiver(Receiver receiver);
-
- /**
- * @return an address you should use in {@link Message#setSource(Address)}
- */
- public Address getLocalAddress();
-
- /**
- * if you want to send a message to all nodes, use this method
- * to get the address you will use as destination in your
- * message
- * @return an address you can use in Message.setDestination()
- */
- public Address getMulticastAddress();
+ /**
+ * Factory method
+ *
+ * @return a new empty message, ready to be sent to all nodes
+ */
+ public Message newMulticastMessage();
- /**
- * this call should release all the ressources used by the transport layer
- * i.e. close sockets, connections etc.
- */
- public void close();
-
+ /**
+ * Use this method to set the receiver for this transport every message
+ * received by this transport will be sent to this object
+ *
+ * @param receiver
+ */
+ public void setReceiver(Receiver receiver);
+
+ /**
+ * @return an address you should use in {@link Message#setSource(Address)}
+ */
+ public Address getLocalAddress();
+
+ /**
+ * if you want to send a message to all nodes, use this method to get the
+ * address you will use as destination in your message
+ *
+ * @return an address you can use in Message.setDestination()
+ */
+ public Address getMulticastAddress();
+
+ /**
+ * this call should release all the ressources used by the transport layer
+ * i.e. close sockets, connections etc.
+ */
+ public void close();
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsAddress.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsAddress.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsAddress.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -4,16 +4,16 @@
public class JGroupsAddress extends Address {
- private static final long serialVersionUID = 8851527317522260037L;
-
- org.jgroups.Address address;
+ private static final long serialVersionUID = 8851527317522260037L;
- public JGroupsAddress(org.jgroups.Address address) {
- this.address = address;
- }
-
- public org.jgroups.Address getAddress() {
- return address;
- }
-
+ org.jgroups.Address address;
+
+ public JGroupsAddress(org.jgroups.Address address) {
+ this.address = address;
+ }
+
+ public org.jgroups.Address getAddress() {
+ return address;
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsMessage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsMessage.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsMessage.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -8,74 +8,72 @@
public class JGroupsMessage extends Message implements Serializable {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 6239305387929159827L;
- protected org.jgroups.Message message;
-
- protected JGroupsMessage(Transport transport) {
- super(transport);
- this.message = new org.jgroups.Message();
- }
-
- protected JGroupsMessage(Transport transport, org.jgroups.Message message) {
- this(transport);
- this.message = message;
- }
-
- protected JGroupsMessage(Transport transport,
- org.jgroups.Address source,
- org.jgroups.Address destination,
- Serializable content) {
- this(transport);
- this.message = new org.jgroups.Message(source, destination, content);
- }
-
- @Override
- public Serializable getContent() {
- return (Serializable) this.message.getObject();
- }
+ private static final long serialVersionUID = 6239305387929159827L;
+ protected org.jgroups.Message message;
- @Override
- public Address getDestination() {
- JGroupsAddress destination = new JGroupsAddress(message.getDest());
- return destination;
- }
+ protected JGroupsMessage(Transport transport) {
+ super(transport);
+ this.message = new org.jgroups.Message();
+ }
- @Override
- public Address getSource() {
- JGroupsAddress destination = new JGroupsAddress(message.getSrc());
- return destination;
- }
+ protected JGroupsMessage(Transport transport, org.jgroups.Message message) {
+ this(transport);
+ this.message = message;
+ }
- @Override
- public void setContent(Serializable content) {
- this.message.setObject(content);
- }
+ protected JGroupsMessage(Transport transport, org.jgroups.Address source,
+ org.jgroups.Address destination, Serializable content) {
+ this(transport);
+ this.message = new org.jgroups.Message(source, destination, content);
+ }
- @Override
- public void setDestination(Address destination) {
- org.jgroups.Address dest = ((JGroupsAddress) destination).getAddress();
- this.message.setDest(dest);
- }
+ @Override
+ public Serializable getContent() {
+ return (Serializable) this.message.getObject();
+ }
- @Override
- public void setSource(Address source) {
- org.jgroups.Address src = ((JGroupsAddress) source).getAddress();
- this.message.setDest(src);
- }
+ @Override
+ public Address getDestination() {
+ JGroupsAddress destination = new JGroupsAddress(message.getDest());
+ return destination;
+ }
- public org.jgroups.Message getMessage() {
- return this.message;
- }
+ @Override
+ public Address getSource() {
+ JGroupsAddress destination = new JGroupsAddress(message.getSrc());
+ return destination;
+ }
- @Override
- public Message newReply() {
- Message reply = new JGroupsMessage(this.transport);
- reply.setDestination(this.getSource());
- reply.setSource(this.getDestination());
- return reply;
- }
-
+ @Override
+ public void setContent(Serializable content) {
+ this.message.setObject(content);
+ }
+
+ @Override
+ public void setDestination(Address destination) {
+ org.jgroups.Address dest = ((JGroupsAddress) destination).getAddress();
+ this.message.setDest(dest);
+ }
+
+ @Override
+ public void setSource(Address source) {
+ org.jgroups.Address src = ((JGroupsAddress) source).getAddress();
+ this.message.setDest(src);
+ }
+
+ public org.jgroups.Message getMessage() {
+ return this.message;
+ }
+
+ @Override
+ public Message newReply() {
+ Message reply = new JGroupsMessage(this.transport);
+ reply.setDestination(this.getSource());
+ reply.setSource(this.getDestination());
+ return reply;
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -18,146 +18,147 @@
public class JGroupsTransport implements Transport {
- protected DisworkConfig disworkConfig;
- protected JChannel jChannel;
- protected Receiver receiver;
-
- private static final Log log = LogFactory.getLog(JGroupsTransport.class);
-
- /**
- * This class is just a delegator. Every message received on this.jChannel,
- * will be sent to this.receiver who subscribed to transport
- * @author bleny
- */
- private class MyReceiver extends org.jgroups.ExtendedReceiverAdapter {
-
- private Transport tranport;
-
- public MyReceiver(Transport tranport) {
- this.tranport = tranport;
- }
+ protected DisworkConfig disworkConfig;
+ protected JChannel jChannel;
+ protected Receiver receiver;
- /**
- * receive a new message from the JChannel, send it to the
- * transport layer
- */
- public void receive(org.jgroups.Message msg) {
- // this method is call be the jChannel when a message is received
- if (log.isDebugEnabled())
- log.info("message received from jChannel " + msg);
-
- // let's create a new message for the transport layer
- Message message = tranport.newEmptyMessage();
+ private static final Log log = LogFactory.getLog(JGroupsTransport.class);
- // setting the message attributes, according to the JGroups message
- message.setSource(new JGroupsAddress(msg.getSrc()));
- message.setSource(new JGroupsAddress(msg.getDest()));
- message.setContent((Serializable) msg.getObject());
-
- // sending our new message to the transport layer
- if (log.isDebugEnabled())
- log.info("message received from jChannel sent to transport layer as " + message);
- receiver.receiveMessage(message);
- }
+ /**
+ * This class is just a delegator. Every message received on this.jChannel,
+ * will be sent to this.receiver who subscribed to transport
+ *
+ * @author bleny
+ */
+ private class MyReceiver extends org.jgroups.ExtendedReceiverAdapter {
- public void viewAccepted(View new_view) {
- log.info("now seeing " + new_view.size() + " nodes");
- }
-
- }
-
-
- public JGroupsTransport(DisworkConfig disworkConfig) {
- this.disworkConfig = disworkConfig;
+ private Transport tranport;
+
+ public MyReceiver(Transport tranport) {
+ this.tranport = tranport;
+ }
+
+ /**
+ * receive a new message from the JChannel, send it to the transport
+ * layer
+ */
+ public void receive(org.jgroups.Message msg) {
+ // this method is call be the jChannel when a message is received
+ if (log.isDebugEnabled())
+ log.info("message received from jChannel " + msg);
+
+ // let's create a new message for the transport layer
+ Message message = tranport.newEmptyMessage();
+
+ // setting the message attributes, according to the JGroups message
+ message.setSource(new JGroupsAddress(msg.getSrc()));
+ message.setSource(new JGroupsAddress(msg.getDest()));
+ message.setContent((Serializable) msg.getObject());
+
+ // sending our new message to the transport layer
+ if (log.isDebugEnabled())
+ log
+ .info("message received from jChannel sent to transport layer as "
+ + message);
+ receiver.receiveMessage(message);
+ }
+
+ public void viewAccepted(View new_view) {
+ log.info("now seeing " + new_view.size() + " nodes");
+ }
+
+ }
+
+ public JGroupsTransport(DisworkConfig disworkConfig) {
+ this.disworkConfig = disworkConfig;
try {
jChannel = new JChannel("udp.xml");
-
+
String clusterName = disworkConfig.getJGroupsClusterName();
-
- /*
- String localIp;
- try {
- localIp = InetAddress.getLocalHost().getHostAddress();
- log.info("local IP is " + localIp);
- } catch (UnknownHostException e) {
- log.error("can't get local IP");
- }
- */
- // System.setProperty("jgroups.bind_addr", localIp);
- // System.setProperty("jgroups.tcpping.initial_hosts", localIp);
- // System.setProperty("jgroups.udp.mcast_addr", "224.0.0.150");
- // System.setProperty("java.net.preferIPv4Stack", "true");
-
-
+
+ /*
+ * String localIp; try { localIp =
+ * InetAddress.getLocalHost().getHostAddress();
+ * log.info("local IP is " + localIp); } catch (UnknownHostException
+ * e) { log.error("can't get local IP"); }
+ */
+ // System.setProperty("jgroups.bind_addr", localIp);
+ // System.setProperty("jgroups.tcpping.initial_hosts", localIp);
+ // System.setProperty("jgroups.udp.mcast_addr", "224.0.0.150");
+ // System.setProperty("java.net.preferIPv4Stack", "true");
+
log.info("connecting to JGroup " + clusterName);
jChannel.connect(clusterName);
-
+
// don't receive messages sent by myself
jChannel.setOpt(Channel.LOCAL, new Boolean(false));
-
+
// setting the receiver so every message received
// on jChannel will be received by the rest of
// the application
MyReceiver myReceiver = new MyReceiver(this);
jChannel.setReceiver(myReceiver);
-
+
} catch (ChannelException e) {
- log.error("error while creating and connecting to the JGroups channel");
+ log
+ .error("error while creating and connecting to the JGroups channel");
}
- }
-
- @Override
- public void setReceiver(Receiver receiver) {
+ }
+
+ @Override
+ public void setReceiver(Receiver receiver) {
this.receiver = receiver;
- }
-
- @Override
- public Address getMulticastAddress() {
- // null is the multicast destination in JGroups
- return new JGroupsAddress(null);
- }
-
- @Override
- public boolean send(Message message) {
- org.jgroups.Message msg = ((JGroupsMessage) message).getMessage();
- boolean success = true;
- try {
- jChannel.send(msg);
- } catch (ChannelNotConnectedException e) {
- log.error("JGroups channel not connected while trying to send a message");
+ }
+
+ @Override
+ public Address getMulticastAddress() {
+ // null is the multicast destination in JGroups
+ return new JGroupsAddress(null);
+ }
+
+ @Override
+ public boolean send(Message message) {
+ org.jgroups.Message msg = ((JGroupsMessage) message).getMessage();
+ boolean success = true;
+ try {
+ jChannel.send(msg);
+ } catch (ChannelNotConnectedException e) {
+ log
+ .error("JGroups channel not connected while trying to send a message");
success = false;
- } catch (ChannelClosedException e) {
- log.error("JGroups channel was closed while trying to send a message");
+ } catch (ChannelClosedException e) {
+ log
+ .error("JGroups channel was closed while trying to send a message");
success = false;
- }
- return success;
- }
-
- @Override
- public Message newEmptyMessage() {
- Message message = new JGroupsMessage(this, new org.jgroups.Message());
- message.setSource(this.getLocalAddress());
- return message;
- }
-
- @Override
- public Message newMulticastMessage() {
- Message message = this.newEmptyMessage();
- message.setDestination(this.getMulticastAddress());
- return message;
- }
+ }
+ return success;
+ }
- @Override
- public Address getLocalAddress() {
- return new JGroupsAddress(jChannel.getAddress());
- }
+ @Override
+ public Message newEmptyMessage() {
+ Message message = new JGroupsMessage(this, new org.jgroups.Message());
+ message.setSource(this.getLocalAddress());
+ return message;
+ }
- @Override
- public void close() {
- // leaving group
- jChannel.disconnect();
- // closing the channel
- jChannel.close();
- }
-
+ @Override
+ public Message newMulticastMessage() {
+ Message message = this.newEmptyMessage();
+ message.setDestination(this.getMulticastAddress());
+ return message;
+ }
+
+ @Override
+ public Address getLocalAddress() {
+ return new JGroupsAddress(jChannel.getAddress());
+ }
+
+ @Override
+ public void close() {
+ // leaving group
+ jChannel.disconnect();
+ // closing the channel
+ jChannel.close();
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -4,6 +4,6 @@
public interface DownloadObserver {
- public void updateDownloadStatus(DownloadService downloadService);
-
+ public void updateDownloadStatus(DownloadService downloadService);
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -4,6 +4,6 @@
public interface LookUpObserver {
- public void receiveResult(FileDescription fileDescription);
-
+ public void receiveResult(FileDescription fileDescription);
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -6,66 +6,66 @@
import org.nuiton.disworkfs.services.LookUpService;
import org.nuiton.disworkfs.split.FileDescription;
+public class SimpleDownload implements DownloadObserver {
+ /**
+ * only used for synchronisation purpose
+ */
+ protected final Object lock = new Object();
-public class SimpleDownload implements DownloadObserver {
+ protected Boolean downloadFinised = false;
- /**
- * only used for synchronisation purpose
- */
- protected final Object lock = new Object();
-
- protected Boolean downloadFinised = false;
-
protected FileDescription fileDescription = null;
-
+
protected DownloadService downloadService;
-
+
protected SimpleLookUp simpleLookUp;
-
+
private static final Log log = LogFactory.getLog(SimpleDownload.class);
-
- public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) {
- this.downloadService = downloadService;
-
- simpleLookUp = new SimpleLookUp(filePath, lookUpService);
+
+ public SimpleDownload(String filePath, LookUpService lookUpService,
+ DownloadService downloadService) {
+ this.downloadService = downloadService;
+
+ simpleLookUp = new SimpleLookUp(filePath, lookUpService);
}
-
+
public boolean initiateDownload() throws InterruptedException {
- fileDescription = simpleLookUp.runLookUp();
- return fileDescription != null;
+ fileDescription = simpleLookUp.runLookUp();
+ return fileDescription != null;
}
-
+
/**
- * initiateDownload() must be called <strong>before</strong>
- * download start.
+ * initiateDownload() must be called <strong>before</strong> download start.
+ *
* @throws InterruptedException
*/
public void startDownload() throws InterruptedException {
- if (fileDescription == null) {
- log.error("download started without FileDescription");
- } else {
- if (log.isDebugEnabled())
- log.info("starting download for " + fileDescription.getFileName());
+ if (fileDescription == null) {
+ log.error("download started without FileDescription");
+ } else {
+ if (log.isDebugEnabled())
+ log.info("starting download for "
+ + fileDescription.getFileName());
- downloadService.startDownload(fileDescription, this);
+ downloadService.startDownload(fileDescription, this);
- synchronized (lock) {
- lock.wait();
- if (log.isDebugEnabled())
- log.info("download " + fileDescription.getFileName() + " is complete");
- }
- }
+ synchronized (lock) {
+ lock.wait();
+ if (log.isDebugEnabled())
+ log.info("download " + fileDescription.getFileName()
+ + " is complete");
+ }
+ }
}
-
-
+
@Override
public void updateDownloadStatus(DownloadService downloadService) {
- downloadFinised = downloadService.isFinished(fileDescription);
- if (downloadFinised) {
- synchronized (lock) {
- lock.notify();
- }
- }
- }
+ downloadFinised = downloadService.isFinished(fileDescription);
+ if (downloadFinised) {
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -7,54 +7,55 @@
public class SimpleLookUp implements LookUpObserver {
- /**
- * only used for synchronisation purpose
- */
- protected final Object lock = new Object();
+ /**
+ * only used for synchronisation purpose
+ */
+ protected final Object lock = new Object();
- protected Boolean lookUpResponseReceived = false;
+ protected Boolean lookUpResponseReceived = false;
- protected FileDescription fileDescription = null;
+ protected FileDescription fileDescription = null;
- protected LookUpService lookUpService;
-
- protected String filePath;
+ protected LookUpService lookUpService;
- private static final Log log = LogFactory.getLog(SimpleDownload.class);
+ protected String filePath;
- public SimpleLookUp(String filePath, LookUpService lookUpService) {
- this.filePath = filePath;
- this.lookUpService = lookUpService;
- }
+ private static final Log log = LogFactory.getLog(SimpleLookUp.class);
- /**
- *
- * @return the FileDescription or null if file is not found
- * @throws InterruptedException
- */
- public FileDescription runLookUp() throws InterruptedException {
+ public SimpleLookUp(String filePath, LookUpService lookUpService) {
+ this.filePath = filePath;
+ this.lookUpService = lookUpService;
+ }
- lookUpService.lookForFileName(filePath, this);
+ /**
+ *
+ * @return the FileDescription or null if file is not found
+ * @throws InterruptedException
+ */
+ public FileDescription runLookUp() throws InterruptedException {
- synchronized (lock) {
- lock.wait(10 * 1000); // time out at 10 seconds
- }
+ lookUpService.lookForFileName(filePath, this);
- if (lookUpResponseReceived) {
- log.info("look-up response received for " + filePath);
- } else {
- log.info("no look-up response received for " + filePath);
- }
+ synchronized (lock) {
+ lock.wait(10 * 1000); // time out at 10 seconds
+ }
- return this.fileDescription;
- }
+ if (lookUpResponseReceived) {
+ log.info("look-up response received for " + filePath);
+ } else {
+ log.info("no look-up response received for " + filePath);
+ }
- @Override
- public void receiveResult(FileDescription fileDescription) {
- synchronized (lock) {
- this.lookUpResponseReceived = true;
- this.fileDescription = fileDescription;
- lock.notify();
- }
- }
+ // may be null if no response received
+ return this.fileDescription;
+ }
+
+ @Override
+ public void receiveResult(FileDescription fileDescription) {
+ synchronized (lock) {
+ this.lookUpResponseReceived = true;
+ this.fileDescription = fileDescription;
+ lock.notify();
+ }
+ }
}
Modified: trunk/diswork-fs/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-10 09:30:32 UTC (rev 29)
@@ -5,7 +5,7 @@
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
# package level
-log4j.logger.org.nuiton.disworkfs=DEBUG
+log4j.logger.org.nuiton.disworkfs=TRACE
#log4j.logger.org.nuiton.disworkfs.transport=WARN
#log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN
#log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,12 +1,10 @@
package org.nuiton.disworkfs;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileInputStream;
-import java.io.IOException;
import java.util.Random;
import org.apache.commons.io.FileUtils;
@@ -19,11 +17,12 @@
public class DistributedFileSystemTest {
/**
- * a place to store files for the test
- * it's a subdirectory of the OS temp dir
- * e.g. under linux /tmp/disworkfs/tests/
+ * a place to store files for the test it's a subdirectory of the OS temp
+ * dir e.g. under linux /tmp/disworkfs/tests/
*/
- static protected String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests";
+ static protected String tempDirectoryPath = System.getProperty(
+ "java.io.tmpdir", ".")
+ + "/disworkfs/tests";
/**
* We will create a file at this path for test purpose
@@ -40,12 +39,11 @@
static protected DisworkConfig disworkConfig1;
static protected DisworkConfig disworkConfig2;
-
+
@Before
public void setUp() throws Exception {
File tempDirectory = new File(tempDirectoryPath);
tempDirectory.mkdir();
-
File storage1 = new File(tempDirectory, "storage1");
storage1.mkdir();
@@ -63,10 +61,9 @@
File randomFile = new File(randomFilePath);
FileUtils.writeByteArrayToFile(randomFile, randomBytes);
-
disworkConfig1 = new DisworkConfig();
disworkConfig1.setOption("storage", storagePath1);
-
+
disworkConfig2 = new DisworkConfig();
disworkConfig2.setOption("storage", storagePath2);
}
@@ -80,48 +77,63 @@
@Test
public void testWrite() throws Exception {
- DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
- disworkFileSystem1.write("monfichier", new File(randomFilePath));
- disworkFileSystem1.close();
+ DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(
+ disworkConfig1);
+ disworkFileSystem1.write("monfichier", new FileInputStream(randomFilePath));
+ disworkFileSystem1.close();
+
+
+ File monfichier = new File(randomFilePath);
+ File monfichierstorage = new File(tempDirectoryPath
+ + "/storage1/monfichier");
+
+ boolean actualContentEquality = IOUtils.contentEquals(
+ new FileInputStream(monfichier), new FileInputStream(
+ monfichierstorage));
+
+ assertTrue("file and copy content should be the same",
+ actualContentEquality);
}
/**
- * this test run two DistributedFS. A file is written on the first
- * when. We try to read from the other FS : since it doesn't own the file,
- * it will try a lookup and download the file.
+ * this test run two DistributedFS. A file is written on the first when. We
+ * try to read from the other FS : since it doesn't own the file, it will
+ * try a lookup and download the file.
*/
@Test
public void testRead() throws Exception {
- DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
- DisworkFileSystem disworkFileSystem2 = new DisworkFileSystem(disworkConfig2);
+ DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(
+ disworkConfig1);
+ DisworkFileSystem disworkFileSystem2 = new DisworkFileSystem(
+ disworkConfig2);
+
+ disworkFileSystem1.write("mon/chemin/vers/mon/fichier", new FileInputStream(
+ randomFilePath));
+
+ boolean existsResult = disworkFileSystem2
+ .exists("mon/chemin/vers/mon/fichier");
+ assertTrue(existsResult);
- disworkFileSystem1.write("mon/chemin/vers/mon/fichier", new File(randomFilePath));
-
- boolean existsResult = disworkFileSystem2.exists("mon/chemin/vers/mon/fichier");
- assertTrue(existsResult);
- existsResult = disworkFileSystem2.exists("unautrefichierquinexistepas");
- assertFalse(existsResult);
-
- disworkFileSystem2.read("mon/chemin/vers/mon/fichier");
-
- // TODO
- File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/mon/chemin/vers/mon/fichier");
- File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/mon/chemin/vers/mon/fichier");
+ existsResult = disworkFileSystem2.exists("unautrefichierquinexistepas");
+ assertFalse(existsResult);
- try {
- boolean actualContentEquality = IOUtils.contentEquals(
- new FileInputStream(monfichierstorage1),
- new FileInputStream(monfichierstorage2)
- );
- assertTrue("file and copy content should be the same", actualContentEquality);
- } catch (IOException e) {
- fail("one or both files are not readable");
- e.printStackTrace();
- }
+ disworkFileSystem2.read("mon/chemin/vers/mon/fichier");
- disworkFileSystem1.close();
- disworkFileSystem2.close();
+ File monfichierstorage1 = new File(tempDirectoryPath
+ + "/storage1/mon/chemin/vers/mon/fichier");
+ File monfichierstorage2 = new File(tempDirectoryPath
+ + "/storage2/mon/chemin/vers/mon/fichier");
+
+ boolean actualContentEquality = IOUtils.contentEquals(
+ new FileInputStream(monfichierstorage1), new FileInputStream(
+ monfichierstorage2));
+
+ assertTrue("file and copy content should be the same",
+ actualContentEquality);
+
+ disworkFileSystem1.close();
+ disworkFileSystem2.close();
}
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -2,11 +2,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileInputStream;
-import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
@@ -14,52 +12,39 @@
public class SplitFileFromChunksTest extends AbstractSplitFileTest {
@Test
- public void simpleCopyRandomAccess() {
+ public void simpleCopyRandomAccess() throws Exception {
+ File randomFile = new File(randomFilePath);
- try {
- File randomFile = new File(randomFilePath);
+ // let's take a file
+ SplitFileFromLocalFile splitFile = new SplitFileFromLocalFile(randomFile);
- // let's take a file
- SplitFileFromLocalFile splitFile = new SplitFileFromLocalFile(randomFile);
-
- // let's create a second file
- FileDescription fileDescription = splitFile.getFileDescription();
- String splitedFileCopyPath = fileDescription.getFileName() + "_copy";
- fileDescription.setFileName(splitedFileCopyPath);
-
- SplitFileFromChunks splitFileCopy = new SplitFileFromChunks(fileDescription, new File(splitedFileCopyPath));
-
- // here is the simple copy from fist file to second file
- for (FileChunk fileChunk : splitFile.getAllChunks()) {
- assertFalse(splitFileCopy.isComplete());
- splitFileCopy.addChunk(fileChunk);
- }
+ // let's create a second file
+ FileDescription fileDescription = splitFile.getFileDescription();
+ String splitedFileCopyPath = fileDescription.getFileName() + "_copy";
+ fileDescription.setFileName(splitedFileCopyPath);
- assertTrue(splitFileCopy.isComplete());
- // write the copy to the FS
+ SplitFileFromChunks splitFileCopy = new SplitFileFromChunks(fileDescription, new File(splitedFileCopyPath));
- // compare the original and the copy byte by byte
- try {
- boolean actualContentEquality = IOUtils.contentEquals(
- new FileInputStream(new File(randomFilePath)),
- new FileInputStream(new File(splitedFileCopyPath))
- );
- assertTrue("file and copy content should be the same", actualContentEquality);
- } catch (IOException e) {
- fail("one or both files are not readable");
- e.printStackTrace();
- }
-
- // delete the copy
- new File(splitedFileCopyPath).delete();
-
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
+ // here is the simple copy from fist file to second file
+ for (FileChunk fileChunk : splitFile.getAllChunks()) {
+ assertFalse(splitFileCopy.isComplete());
+ splitFileCopy.addChunk(fileChunk);
}
-
+
+ assertTrue(splitFileCopy.isComplete());
+ // write the copy to the FS
+
+ // compare the original and the copy byte by byte
+
+ boolean actualContentEquality = IOUtils.contentEquals(
+ new FileInputStream(new File(randomFilePath)),
+ new FileInputStream(new File(splitedFileCopyPath))
+ );
+ assertTrue("file and copy content should be the same", actualContentEquality);
+
+ // delete the copy
+ new File(splitedFileCopyPath).delete();
+
}
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-10 09:30:32 UTC (rev 29)
@@ -2,10 +2,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.File;
-import java.io.IOException;
import java.util.List;
import org.junit.Test;
@@ -13,32 +11,27 @@
public class SplitFileFromLocalFileTest extends AbstractSplitFileTest {
@Test
- public void testReadFromLocalFileSytem() {
- try {
- File randomFile = new File(randomFilePath);
-
- SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(randomFile);
-
- FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
- assertTrue(fileDescription.getTotalSize() == randomFileSize);
- assertTrue(fileDescription.getFileCheckSum() != 0);
- assertEquals(FileChunk.numberOfChunksNeededtoStore(randomFileSize), fileDescription.getNumberOfChunks());
- assertEquals(fileDescription.getFileName(), randomFilePath);
-
- List<FileChunk> allChunks = splitFileFromLocalFile.getAllChunks();
-
- for (FileChunk fileChunk : allChunks) {
- int chunkSize = fileChunk.getChunkSize();
-
- assertTrue(
- chunkSize == FileChunk.MAX_CHUNK_SIZE
- || chunkSize == randomFileSize % FileChunk.MAX_CHUNK_SIZE);
- }
-
- } catch (IOException e) {
- fail();
- e.printStackTrace();
+ public void testReadFromLocalFileSytem() throws Exception {
+ File randomFile = new File(randomFilePath);
+
+ SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(randomFile);
+
+ FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
+ assertTrue(fileDescription.getTotalSize() == randomFileSize);
+ assertTrue(fileDescription.getFileCheckSum() != 0);
+ assertEquals(FileChunk.numberOfChunksNeededtoStore(randomFileSize), fileDescription.getNumberOfChunks());
+ assertEquals(fileDescription.getFileName(), randomFilePath);
+
+ List<FileChunk> allChunks = splitFileFromLocalFile.getAllChunks();
+
+ for (FileChunk fileChunk : allChunks) {
+ int chunkSize = fileChunk.getChunkSize();
+
+ assertTrue(
+ chunkSize == FileChunk.MAX_CHUNK_SIZE
+ || chunkSize == randomFileSize % FileChunk.MAX_CHUNK_SIZE);
}
+
}
}
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-09 12:33:47 UTC (rev 28)
+++ trunk/pom.xml 2010-05-10 09:30:32 UTC (rev 29)
@@ -1,11 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
- <!-- ************************************************************* -->
- <!-- *** POM Relationships *************************************** -->
- <!-- ************************************************************* -->
+ <!-- ************************************************************* -->
+ <!-- *** POM Relationships *************************************** -->
+ <!-- ************************************************************* -->
<parent>
<groupId>org.nuiton</groupId>
@@ -13,145 +14,177 @@
<version>2.1.4</version>
</parent>
- <artifactId>diswork</artifactId>
- <version>0.0.1-SNAPSHOT</version>
+ <artifactId>diswork</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
- <modules>
- <module>diswork-fs</module>
- </modules>
+ <modules>
+ <module>diswork-fs</module>
+ </modules>
- <dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.14</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.9.0.GA</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.nuiton</groupId>
+ <artifactId>nuiton-utils</artifactId>
+ <version>1.2.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>fr.inria.peerunit</groupId>
+ <artifactId>PeerUnit</artifactId>
+ <version>1.0</version>
+ <scope>compile</scope>
+ </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.14</version>
- <scope>test</scope>
- </dependency>
+ <!-- test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <!-- ************************************************************* -->
+ <!-- *** Project Information ************************************* -->
+ <!-- ************************************************************* -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.8.1</version>
- <scope>test</scope>
- </dependency>
+ <name>Diswork</name>
+ <description>Distributed computing application</description>
+ <inceptionYear>2010</inceptionYear>
+ <url>http://maven-site.nuiton.org/diswork</url>
+ <developers>
+ <developer>
+ <name>Brendan Le Ny</name>
+ <id>bleny</id>
+ <email>leny(a)codelutin.com</email>
+ <organization>CodeLutin</organization>
+ <timezone>+2</timezone>
+ <roles>
+ <role>Développeur</role>
+ </roles>
+ </developer>
+ <developer>
+ <name>Benjamin Poussin</name>
+ <id>bpoussin</id>
+ <email>poussin(a)codelutin.com</email>
+ <organization>CodeLutin</organization>
+ <timezone>+2</timezone>
+ <roles>
+ <role>Développeur</role>
+ <role>Debian packager</role>
+ </roles>
+ </developer>
+ <developer>
+ <name>Eric Chatellier</name>
+ <id>echatellier</id>
+ <email>chatellier(a)codelutin.com</email>
+ <organization>CodeLutin</organization>
+ <timezone>+2</timezone>
+ <roles>
+ <role>Développeur</role>
+ </roles>
+ </developer>
+ <developer>
+ <name>Jean Couteau</name>
+ <id>jcouteau</id>
+ <email>couteau(a)codelutin.com</email>
+ <organization>CodeLutin</organization>
+ <timezone>+2</timezone>
+ <roles>
+ <role>Développeur</role>
+ </roles>
+ </developer>
+ <developer>
+ <name>Tony Chemit</name>
+ <id>tchemit</id>
+ <email>chemit(a)codelutin.com</email>
+ <organization>CodeLutin</organization>
+ <timezone>+2</timezone>
+ <roles>
+ <role>Développeur</role>
+ </roles>
+ </developer>
+ </developers>
- </dependencies>
+ <!-- ************************************************************* -->
+ <!-- *** Build Settings ****************************************** -->
+ <!-- ************************************************************* -->
- <!-- ************************************************************* -->
- <!-- *** Project Information ************************************* -->
- <!-- ************************************************************* -->
+ <packaging>pom</packaging>
- <name>Diswork</name>
- <description>Distributed computing application</description>
- <inceptionYear>2010</inceptionYear>
- <url>http://maven-site.nuiton.org/diswork</url>
+ <properties>
+ <siteLocales>en</siteLocales>
+ </properties>
- <developers>
- <developer>
- <name>Brendan Le Ny</name>
- <id>bleny</id>
- <email>leny(a)codelutin.com</email>
- <organization>CodeLutin</organization>
- <timezone>+2</timezone>
- <roles>
- <role>Développeur</role>
- </roles>
- </developer>
- <developer>
- <name>Benjamin Poussin</name>
- <id>bpoussin</id>
- <email>poussin(a)codelutin.com</email>
- <organization>CodeLutin</organization>
- <timezone>+2</timezone>
- <roles>
- <role>Développeur</role>
- <role>Debian packager</role>
- </roles>
- </developer>
- <developer>
- <name>Eric Chatellier</name>
- <id>echatellier</id>
- <email>chatellier(a)codelutin.com</email>
- <organization>CodeLutin</organization>
- <timezone>+2</timezone>
- <roles>
- <role>Développeur</role>
- </roles>
- </developer>
- <developer>
- <name>Jean Couteau</name>
- <id>jcouteau</id>
- <email>couteau(a)codelutin.com</email>
- <organization>CodeLutin</organization>
- <timezone>+2</timezone>
- <roles>
- <role>Développeur</role>
- </roles>
- </developer>
- <developer>
- <name>Tony Chemit</name>
- <id>tchemit</id>
- <email>chemit(a)codelutin.com</email>
- <organization>CodeLutin</organization>
- <timezone>+2</timezone>
- <roles>
- <role>Développeur</role>
- </roles>
- </developer>
- </developers>
+ <build>
+ <plugins>
- <!-- ************************************************************* -->
- <!-- *** Build Settings ****************************************** -->
- <!-- ************************************************************* -->
+ </plugins>
- <packaging>pom</packaging>
+ <pluginManagement>
+ <plugins>
- <properties>
- <siteLocales>en</siteLocales>
- </properties>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.nuiton.jrst</groupId>
+ <artifactId>doxia-module-jrst</artifactId>
+ <version>${jrst.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
- <build>
- <plugins>
+ </plugins>
+ </pluginManagement>
- </plugins>
+ </build>
- <pluginManagement>
- <plugins>
+ <!-- ************************************************************* -->
+ <!-- *** Build Environment ************************************** -->
+ <!-- ************************************************************* -->
- <plugin>
- <artifactId>maven-site-plugin</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.nuiton.jrst</groupId>
- <artifactId>doxia-module-jrst</artifactId>
- <version>${jrst.version}</version>
- </dependency>
- </dependencies>
- </plugin>
+ <!-- Source control management. -->
+ <scm>
+ <connection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</connection>
+ <developerConnection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</developerConnection>
+ <url>http://www.nuiton.org/repositories/browse/diswork/diswork/trunk</url>
+ </scm>
- </plugins>
- </pluginManagement>
-
- </build>
-
- <!-- ************************************************************* -->
- <!-- *** Build Environment ************************************** -->
- <!-- ************************************************************* -->
-
- <!-- Source control management. -->
- <scm>
- <connection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</connection>
- <developerConnection>scm:svn:http://svn.nuiton.org/svn/diswork/diswork/trunk</developerConnection>
- <url>http://www.nuiton.org/repositories/browse/diswork/diswork/trunk</url>
- </scm>
-
</project>
1
0
Author: tchemit
Date: 2010-05-09 14:33:47 +0200 (Sun, 09 May 2010)
New Revision: 28
Url: http://nuiton.org/repositories/revision/diswork/28
Log:
Utilisation de mavenpom4redmine 2.1.4
Modified:
trunk/pom.xml
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-07 15:01:28 UTC (rev 27)
+++ trunk/pom.xml 2010-05-09 12:33:47 UTC (rev 28)
@@ -10,7 +10,7 @@
<parent>
<groupId>org.nuiton</groupId>
<artifactId>mavenpom4redmine</artifactId>
- <version>2.1.3</version>
+ <version>2.1.4</version>
</parent>
<artifactId>diswork</artifactId>
1
0
r27 - trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split
by bpoussin@users.nuiton.org 07 May '10
by bpoussin@users.nuiton.org 07 May '10
07 May '10
Author: bpoussin
Date: 2010-05-07 17:01:28 +0200 (Fri, 07 May 2010)
New Revision: 27
Url: http://nuiton.org/repositories/revision/diswork/27
Log:
change bad catch exception
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 16:00:05 UTC (rev 26)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-07 15:01:28 UTC (rev 27)
@@ -64,8 +64,7 @@
bitSet = (BitSet) new ObjectInputStream(is).readObject();
is.close();
} catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new IOException("Unknown Metadata", e);
}
return bitSet;
}
1
0
r26 - in trunk/diswork-fs/src: main/java/org/nuiton/disworkfs main/java/org/nuiton/disworkfs/services main/java/org/nuiton/disworkfs/split main/java/org/nuiton/disworkfs/transport main/java/org/nuiton/disworkfs/util test/java/org/nuiton/disworkfs test/java/org/nuiton/disworkfs/split
by bleny@users.nuiton.org 06 May '10
by bleny@users.nuiton.org 06 May '10
06 May '10
Author: bleny
Date: 2010-05-06 18:00:05 +0200 (Thu, 06 May 2010)
New Revision: 26
Url: http://nuiton.org/repositories/revision/diswork/26
Log:
exceptions, menage
Added:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java
Removed:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DistributedFileSystem.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,15 +1,39 @@
package org.nuiton.disworkfs;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
public interface DistributedFileSystem {
-
- public boolean exists(String path);
- public void write(String path, File source);
-
- public File read(String path);
+ /**
+ * checks the existence of a file on the virtual FS
+ * @param path a path to a file on the virtual FS
+ * @return true if the file exists, false if not
+ * @throws InterruptedException
+ */
+ public boolean exists(String path) throws InterruptedException;
- public void remove(String path);
+ /**
+ * write a file
+ * @param path path of the virtual FS where the file should be written
+ * @param source the file to copy on the VFS
+ * @throws IOException if problems occurs while reading the source or writing on VFS
+ * @throws InterruptedException
+ */
+ public void write(String path, File source) throws IOException, InterruptedException;
+ /**
+ * read a file on the VFS, the file is downloaded if needed (may take some times)
+ * @param path the path in the virtual FS to the file you want to read
+ * @return ??
+ * @throws FileNotFoundException if no file have been written to this path (you may use {@link #exists(String) to check before read}
+ * @throws IOException if a problem occur while reading the local file
+ * @throws InterruptedException
+ */
+ public File read(String path) throws FileNotFoundException, IOException, InterruptedException;
+
+ public void remove(String path);
+
+ // TODO list a directory content
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -3,17 +3,22 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.nuiton.disworkfs.services.DisworkServicesManager;
import org.nuiton.disworkfs.services.DownloadService;
import org.nuiton.disworkfs.services.LookUpService;
import org.nuiton.disworkfs.services.UploadService;
+import org.nuiton.disworkfs.split.FileDescription;
+import org.nuiton.disworkfs.util.SimpleDownload;
+import org.nuiton.disworkfs.util.SimpleLookUp;
import org.nuiton.util.FileUtil;
-public class DisworkFileSystem {
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+public class DisworkFileSystem implements DistributedFileSystem {
+
protected DownloadService downloadService;
protected UploadService uploadService;
protected LookUpService lookUpService;
@@ -36,7 +41,8 @@
}
- public OutputStream read(String path) throws InterruptedException, FileNotFoundException {
+ @Override
+ public File read(String path) throws InterruptedException, FileNotFoundException {
log.info("trying to read " + path);
@@ -56,21 +62,50 @@
}
+ // FIXME
return null;
}
- public void write(String path, File source) throws IOException {
+ @Override
+ public void write(String path, File source) throws IOException, InterruptedException {
File target = new File(disworkConfig.getStoragePath(), path);
if (target.exists()) {
throw new IOException(target.getAbsolutePath() + " already exists");
- }
+ }
+ if (this.exists(path)) {
+ throw new IOException(target.getAbsolutePath() + " already exists");
+ }
+
FileUtil.copy(source, target);
}
+ @Override
+ public boolean exists(String path) throws InterruptedException {
+
+ File file = new File(disworkConfig.getStoragePath(), path);
+ boolean fileExists = false;
+
+ if (file.exists()) {
+ fileExists = true;
+ } else {
+ SimpleLookUp simpleLookUp = new SimpleLookUp(path, lookUpService);
+ FileDescription lookUpResult = simpleLookUp.runLookUp();
+ fileExists = (lookUpResult != null);
+ }
+
+ return fileExists;
+ }
+
+ @Override
+ public void remove(String path) {
+ throw new NotImplementedException();
+ }
+
public void close() {
disworkServicesManager.stop();
}
+
}
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,70 +0,0 @@
-package org.nuiton.disworkfs;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.nuiton.disworkfs.services.AbstractDisworkService;
-import org.nuiton.disworkfs.transport.Message;
-import org.nuiton.disworkfs.transport.Receiver;
-import org.nuiton.disworkfs.transport.Transport;
-import org.nuiton.disworkfs.transport.jgroups.JGroupsTransport;
-
-public class DisworkServicesManager implements Receiver {
-
- /**
- * all the running services
- */
- private List<AbstractDisworkService> services = new ArrayList<AbstractDisworkService>();
-
- protected Transport transport;
-
- private DisworkConfig disworkConfig;
-
- public DisworkServicesManager(DisworkConfig disworkConfig) {
- this.disworkConfig = disworkConfig;
-
- transport = new JGroupsTransport(disworkConfig);
-
- transport.setReceiver(this);
-
- }
-
- public void register(AbstractDisworkService service) {
-
- // dependency injection, the service need a channel to send a message
- // service.setJChannel(jChannel);
- service.setTransport(transport);
- // ... and the disworkConfig
- service.setDisworkConfig(disworkConfig);
-
- // each service is run in his own thread
- Thread thread = new Thread(service);
- thread.start();
-
- // add this service to the list of running services
- services.add(service);
- }
-
- public void unRegister(AbstractDisworkService service) {
- services.remove(service);
- }
-
-
- @Override
- public void receiveMessage(Message message) {
- for (AbstractDisworkService service : services) {
- service.receiveMessage(message);
- }
- }
-
- /**
- * this stop all the diswork services and close the
- * ressources used by the transport layer
- */
- public void stop() {
- for (AbstractDisworkService service : services) {
- service.stop();
- }
- transport.close();
- }
-}
\ No newline at end of file
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DownloadObserver.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,9 +0,0 @@
-package org.nuiton.disworkfs;
-
-import org.nuiton.disworkfs.services.DownloadService;
-
-public interface DownloadObserver {
-
- public void updateDownloadStatus(DownloadService downloadService);
-
-}
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/LookUpObserver.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,9 +0,0 @@
-package org.nuiton.disworkfs;
-
-import org.nuiton.disworkfs.split.FileDescription;
-
-public interface LookUpObserver {
-
- public void receiveResult(FileDescription fileDescription);
-
-}
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,12 +0,0 @@
-package org.nuiton.disworkfs;
-
-
-public class RunMe {
-
- public static void main(String args[]) {
- DisworkConfig disworkConfig = new DisworkConfig();
-
- DisworkFileSystem disworkFileSystem = new DisworkFileSystem(disworkConfig);
- }
-
-}
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,87 +0,0 @@
-package org.nuiton.disworkfs;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.nuiton.disworkfs.services.DownloadService;
-import org.nuiton.disworkfs.services.LookUpService;
-import org.nuiton.disworkfs.split.FileDescription;
-
-
-
-public class SimpleDownload implements DownloadObserver, LookUpObserver {
-
- /**
- * only used for synchronisation purpose
- */
- protected final Object lock = new Object();
-
- private Boolean lookUpResponseReceived = false;
- private Boolean downloadFinised = false;
-
- private FileDescription fileDescription = null;
-
- private DownloadService downloadService;
- private LookUpService lookUpService;
- private String filePath;
-
- private static final Log log = LogFactory.getLog(SimpleDownload.class);
-
- public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) {
- this.filePath = filePath;
- this.downloadService = downloadService;
- this.lookUpService = lookUpService;
- }
-
- public boolean initiateDownload() throws InterruptedException {
-
- lookUpService.lookForFileName(filePath, this);
-
- synchronized (lock) {
- lock.wait(10 * 1000); // time out at 10 seconds
- }
-
- // TODO throw file not found if timeout exceed
- if (lookUpResponseReceived) {
- log.info("look-up response received for " + filePath);
- } else {
- log.info("no look-up response received for " + filePath);
- }
-
- return lookUpResponseReceived;
- }
-
-
- public void startDownload() throws InterruptedException {
- if (log.isDebugEnabled())
- log.info("starting download for " + fileDescription.getFileName());
-
- downloadService.startDownload(fileDescription, this);
-
- synchronized (lock) {
- lock.wait();
- if (log.isDebugEnabled())
- log.info("download " + fileDescription.getFileName() + " is complete");
- }
- }
-
-
- @Override
- public void updateDownloadStatus(DownloadService downloadService) {
- downloadFinised = downloadService.isFinished(fileDescription);
- if (downloadFinised) {
- synchronized (lock) {
- lock.notify();
- }
- }
- }
-
- @Override
- public void receiveResult(FileDescription fileDescription) {
- synchronized (lock) {
- lookUpResponseReceived = true;
- this.fileDescription = fileDescription;
- lock.notify();
- }
- }
-
-}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,5 +1,6 @@
package org.nuiton.disworkfs.services;
+import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.logging.Log;
@@ -31,25 +32,28 @@
@Override
public void receiveMessage(Message message) {
-
- Serializable obj = message.getContent();
- if (obj instanceof LookUpMessage) {
- receiveLookUpMessage(message);
- } else if (obj instanceof LookUpResponseMessage) {
- receiveLookUpResponseMessage(message);
- } else if (obj instanceof FileRequestMessage) {
- receiveFileRequestMessage(message);
- } else if (obj instanceof FileTransferMessage) {
- receiveFileTransferMessage(message);
- } else {
- log.error("unknow message received");
- }
+ try {
+ Serializable obj = message.getContent();
+ if (obj instanceof LookUpMessage) {
+ receiveLookUpMessage(message);
+ } else if (obj instanceof LookUpResponseMessage) {
+ receiveLookUpResponseMessage(message);
+ } else if (obj instanceof FileRequestMessage) {
+ receiveFileRequestMessage(message);
+ } else if (obj instanceof FileTransferMessage) {
+ receiveFileTransferMessage(message);
+ } else {
+ log.error("unknow message received");
+ }
+ } catch (IOException eee) {
+ log.error("IOException encoutered after receiving a message", eee);
+ }
}
- public void receiveLookUpMessage(Message msg) {}
+ public void receiveLookUpMessage(Message msg) throws IOException {}
public void receiveLookUpResponseMessage(Message msg) {}
- public void receiveFileRequestMessage(Message msg) {}
- public void receiveFileTransferMessage(Message msg) {}
+ public void receiveFileRequestMessage(Message msg) throws IOException {}
+ public void receiveFileTransferMessage(Message msg) throws IOException {}
public void setTransport(Transport transport) {
this.transport = transport;
Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java (rev 0)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DisworkServicesManager.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,70 @@
+package org.nuiton.disworkfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.nuiton.disworkfs.DisworkConfig;
+import org.nuiton.disworkfs.transport.Message;
+import org.nuiton.disworkfs.transport.Receiver;
+import org.nuiton.disworkfs.transport.Transport;
+import org.nuiton.disworkfs.transport.jgroups.JGroupsTransport;
+
+public class DisworkServicesManager implements Receiver {
+
+ /**
+ * all the running services
+ */
+ private List<AbstractDisworkService> services = new ArrayList<AbstractDisworkService>();
+
+ protected Transport transport;
+
+ private DisworkConfig disworkConfig;
+
+ public DisworkServicesManager(DisworkConfig disworkConfig) {
+ this.disworkConfig = disworkConfig;
+
+ transport = new JGroupsTransport(disworkConfig);
+
+ transport.setReceiver(this);
+
+ }
+
+ public void register(AbstractDisworkService service) {
+
+ // dependency injection, the service need a channel to send a message
+ // service.setJChannel(jChannel);
+ service.setTransport(transport);
+ // ... and the disworkConfig
+ service.setDisworkConfig(disworkConfig);
+
+ // each service is run in his own thread
+ Thread thread = new Thread(service);
+ thread.start();
+
+ // add this service to the list of running services
+ services.add(service);
+ }
+
+ public void unRegister(AbstractDisworkService service) {
+ services.remove(service);
+ }
+
+
+ @Override
+ public void receiveMessage(Message message) {
+ for (AbstractDisworkService service : services) {
+ service.receiveMessage(message);
+ }
+ }
+
+ /**
+ * this stop all the diswork services and close the
+ * ressources used by the transport layer
+ */
+ public void stop() {
+ for (AbstractDisworkService service : services) {
+ service.stop();
+ }
+ transport.close();
+ }
+}
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,6 +1,7 @@
package org.nuiton.disworkfs.services;
import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -8,12 +9,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.nuiton.disworkfs.DownloadObserver;
import org.nuiton.disworkfs.messages.FileRequestMessage;
import org.nuiton.disworkfs.messages.FileTransferMessage;
import org.nuiton.disworkfs.split.FileDescription;
import org.nuiton.disworkfs.split.SplitFileFromChunks;
import org.nuiton.disworkfs.transport.Message;
+import org.nuiton.disworkfs.util.DownloadObserver;
public class DownloadService extends AbstractDisworkService {
@@ -26,7 +27,7 @@
private static final Log log = LogFactory.getLog(DownloadService.class);
@Override
- public void receiveFileTransferMessage(Message message) {
+ public void receiveFileTransferMessage(Message message) throws IOException {
FileTransferMessage fileTransferMessage = (FileTransferMessage) message.getContent();
FileDescription fileDescription = fileTransferMessage.getFileDescrition();
@@ -87,6 +88,7 @@
// it's the first observer for this download ever, let's construct a list
observersList = new LinkedList<DownloadObserver>();
}
+
observersList.add(downloadObserver);
downloadObservers.put(fileDescription.getFileCheckSum(), observersList);
}
@@ -117,7 +119,8 @@
if (log.isDebugEnabled())
log.info("sending file request for " + fileDescription.getFileName());
+
registerObserver(fileDescription, downloadObserver);
-
+
}
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -6,10 +6,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.nuiton.disworkfs.LookUpObserver;
import org.nuiton.disworkfs.messages.LookUpMessage;
import org.nuiton.disworkfs.messages.LookUpResponseMessage;
import org.nuiton.disworkfs.transport.Message;
+import org.nuiton.disworkfs.util.LookUpObserver;
@@ -37,6 +37,7 @@
message.send();
log.info("look-up message sent for " + fileName);
+ // FIXME memory leak if multiple look-up on never-will-be-available files
requestToRequester.put(fileName, lookUpObserver);
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -20,95 +20,85 @@
* When receiving a LookUp, sending a LookUpResponse if owning file on local FS
*/
public class UploadService extends AbstractDisworkService {
-
- private static final Log log = LogFactory.getLog(UploadService.class);
-
- @Override
- public void receiveFileRequestMessage(Message message) {
- FileRequestMessage fileRequestMessage = (FileRequestMessage) message.getContent();
- File file = new File(disworkConfig.getStoragePath(), fileRequestMessage.getfileDescription().getFileName());
+ private static final Log log = LogFactory.getLog(UploadService.class);
- if (log.isDebugEnabled())
- log.info("file request message received for " + fileRequestMessage.getfileDescription().getFileName());
-
- if (file.exists()) {
-
- if (log.isDebugEnabled())
- log.info("file found in path " + file.getAbsolutePath());
+ @Override
+ public void receiveFileRequestMessage(Message message) throws IOException {
+ FileRequestMessage fileRequestMessage = (FileRequestMessage) message.getContent();
- SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
+ File file = new File(disworkConfig.getStoragePath(), fileRequestMessage.getfileDescription().getFileName());
- try {
- FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
- fileDescription.setFileName(fileRequestMessage.getfileDescription().getFileName());
-
- List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks();
-
- for (FileChunk fileChunk : chunks) {
- Message reply = message.newReply();
-
- FileTransferMessage fileTransferMessage = new FileTransferMessage(fileChunk, fileDescription);
+ if (log.isDebugEnabled())
+ log.info("file request message received for " + fileRequestMessage.getfileDescription().getFileName());
- reply.setContent(fileTransferMessage);
+ if (file.exists()) {
- if (log.isDebugEnabled())
- log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber());
+ if (log.isDebugEnabled())
+ log.info("file found in path " + file.getAbsolutePath());
- reply.send();
-
- }
+ SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
- if (log.isDebugEnabled())
- log.info("all chunks sent");
-
- } catch (IOException e) {
- log.error("can't read file " + file.getAbsolutePath());
- }
- } else {
- if (log.isDebugEnabled())
- log.info("file not found in path " + file.getAbsolutePath() + " don't send response");
- }
+ FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
+ fileDescription.setFileName(fileRequestMessage.getfileDescription().getFileName());
- }
+ List<FileChunk> chunks = splitFileFromLocalFile.getAllChunks();
- @Override
- public void receiveLookUpMessage(Message message) {
- LookUpMessage lookUpMessage = (LookUpMessage) message.getContent();
+ for (FileChunk fileChunk : chunks) {
+ Message reply = message.newReply();
- if (log.isDebugEnabled())
- log.info("lookup message received : looking for file " + lookUpMessage.getFileName());
-
- File file = new File(disworkConfig.getStoragePath(), lookUpMessage.getFileName());
-
+ FileTransferMessage fileTransferMessage = new FileTransferMessage(fileChunk, fileDescription);
- if (file.exists()) {
- if (log.isDebugEnabled())
- log.info(lookUpMessage.getFileName() + " file found at " + file.getAbsolutePath());
+ reply.setContent(fileTransferMessage);
- SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
+ if (log.isDebugEnabled())
+ log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber());
- try {
+ reply.send();
- Message reply = message.newReply();
+ }
- FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
-
- fileDescription.setFileName(lookUpMessage.getFileName());
-
- reply.setContent(new LookUpResponseMessage(fileDescription));
+ if (log.isDebugEnabled())
+ log.info("all chunks sent");
+ } else {
+ if (log.isDebugEnabled())
+ log.info("file not found in path " + file.getAbsolutePath() + " don't send response");
+ }
- if (log.isDebugEnabled())
- log.info("sending lookUpResponse response for " + lookUpMessage.getFileName());
+ }
- reply.send();
- } catch (IOException e) {
- log.error("can't read file " + file.getAbsolutePath() + " aborting file transfer");
- }
- } else {
- if (log.isDebugEnabled())
- log.info(lookUpMessage.getFileName() + " file not found at " + file.getAbsolutePath() + " no response sent");
- }
+ @Override
+ public void receiveLookUpMessage(Message message) throws IOException {
+ LookUpMessage lookUpMessage = (LookUpMessage) message.getContent();
- }
+ if (log.isDebugEnabled())
+ log.info("lookup message received : looking for file " + lookUpMessage.getFileName());
+
+ File file = new File(disworkConfig.getStoragePath(), lookUpMessage.getFileName());
+
+ if (file.exists()) {
+ if (log.isDebugEnabled())
+ log.info(lookUpMessage.getFileName() + " file found at " + file.getAbsolutePath());
+
+ SplitFileFromLocalFile splitFileFromLocalFile = new SplitFileFromLocalFile(file);
+
+
+ Message reply = message.newReply();
+
+ FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
+
+ fileDescription.setFileName(lookUpMessage.getFileName());
+
+ reply.setContent(new LookUpResponseMessage(fileDescription));
+
+ if (log.isDebugEnabled())
+ log.info("sending lookUpResponse response for " + lookUpMessage.getFileName());
+
+ reply.send();
+ } else {
+ if (log.isDebugEnabled())
+ log.info(lookUpMessage.getFileName() + " file not found at " + file.getAbsolutePath() + " no response sent");
+ }
+
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -12,25 +12,22 @@
import java.util.zip.CRC32;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class SplitFileFromChunks {
-
+
protected FileDescription fileDescription;
protected File destination;
protected File chunkStatusFile;
- public SplitFileFromChunks(FileDescription fileDescription, File destination) {
+ private Log log = LogFactory.getLog(SplitFileFromChunks.class);
+
+ public SplitFileFromChunks(FileDescription fileDescription, File destination) throws IOException {
this.fileDescription = fileDescription;
this.destination = destination;
this.chunkStatusFile = new File(destination.getParent(), "." + destination.getName() + ".index");
-
- if (!chunkStatusFile.exists()) {
- BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks());
- bitSet.set(0, fileDescription.getNumberOfChunks());
- writeChunkStatusFile(bitSet);
- bitSet = readChunkStatusFile();
- }
-
+
if (!destination.exists()) {
try {
RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw");
@@ -38,51 +35,50 @@
randomAccessFile.close();
} catch (FileNotFoundException e) {
// we just checked !destination.exists()
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
}
}
+
+ if (!chunkStatusFile.exists()) {
+ // force the creation of all needed directories
+ log.debug("creating directory " + chunkStatusFile.getParentFile());
+ chunkStatusFile.getParentFile().mkdirs();
+ BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks());
+ bitSet.set(0, fileDescription.getNumberOfChunks());
+ writeChunkStatusFile(bitSet);
+ bitSet = readChunkStatusFile();
+ }
+
}
-
- public boolean isComplete() {
+
+ public boolean isComplete() throws IOException {
BitSet bitSet = readChunkStatusFile();
boolean fileIsComplete = bitSet.cardinality() == 0;
return fileIsComplete;
}
-
-
- protected BitSet readChunkStatusFile() {
+
+
+ protected BitSet readChunkStatusFile() throws IOException {
BitSet bitSet = null;
try {
FileInputStream is = new FileInputStream(chunkStatusFile);
bitSet = (BitSet) new ObjectInputStream(is).readObject();
is.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return bitSet;
}
-
-
- protected void writeChunkStatusFile(BitSet bitSet) {
- try {
- FileOutputStream os = new FileOutputStream(chunkStatusFile);
- new ObjectOutputStream(os).writeObject(bitSet);
- os.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+
+ protected void writeChunkStatusFile(BitSet bitSet) throws IOException {
+ FileOutputStream os = new FileOutputStream(chunkStatusFile);
+ new ObjectOutputStream(os).writeObject(bitSet);
+ os.close();
}
-
-
- public void addChunk(FileChunk fileChunk) {
- try {
+
+
+ public void addChunk(FileChunk fileChunk) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw");
byte[] data = fileChunk.getData();
int off = FileChunk.MAX_CHUNK_SIZE * fileChunk.getChunkNumber();
@@ -90,30 +86,22 @@
randomAccessFile.seek(off);
randomAccessFile.write(data, 0, len);
randomAccessFile.close();
-
+
// updating status
-
BitSet chunkStatus = readChunkStatusFile();
chunkStatus.clear(fileChunk.getChunkNumber());
writeChunkStatusFile(chunkStatus);
-
-
+
+
if (isComplete()) {
long expectedChecksum = fileDescription.getFileCheckSum();
long actualCheckSum = FileUtils.checksum(destination, new CRC32()).getValue();
-
+
if (actualCheckSum != expectedChecksum) {
throw new IOException("checksum fail");
}
}
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
}
-
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Receiver.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,5 +1,6 @@
package org.nuiton.disworkfs.transport;
+
public interface Receiver {
public void receiveMessage(Message message);
Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java (rev 0)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/DownloadObserver.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,9 @@
+package org.nuiton.disworkfs.util;
+
+import org.nuiton.disworkfs.services.DownloadService;
+
+public interface DownloadObserver {
+
+ public void updateDownloadStatus(DownloadService downloadService);
+
+}
Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java (rev 0)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/LookUpObserver.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,9 @@
+package org.nuiton.disworkfs.util;
+
+import org.nuiton.disworkfs.split.FileDescription;
+
+public interface LookUpObserver {
+
+ public void receiveResult(FileDescription fileDescription);
+
+}
Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java (rev 0)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleDownload.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,71 @@
+package org.nuiton.disworkfs.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.nuiton.disworkfs.services.DownloadService;
+import org.nuiton.disworkfs.services.LookUpService;
+import org.nuiton.disworkfs.split.FileDescription;
+
+
+
+public class SimpleDownload implements DownloadObserver {
+
+ /**
+ * only used for synchronisation purpose
+ */
+ protected final Object lock = new Object();
+
+ protected Boolean downloadFinised = false;
+
+ protected FileDescription fileDescription = null;
+
+ protected DownloadService downloadService;
+
+ protected SimpleLookUp simpleLookUp;
+
+ private static final Log log = LogFactory.getLog(SimpleDownload.class);
+
+ public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) {
+ this.downloadService = downloadService;
+
+ simpleLookUp = new SimpleLookUp(filePath, lookUpService);
+ }
+
+ public boolean initiateDownload() throws InterruptedException {
+ fileDescription = simpleLookUp.runLookUp();
+ return fileDescription != null;
+ }
+
+ /**
+ * initiateDownload() must be called <strong>before</strong>
+ * download start.
+ * @throws InterruptedException
+ */
+ public void startDownload() throws InterruptedException {
+ if (fileDescription == null) {
+ log.error("download started without FileDescription");
+ } else {
+ if (log.isDebugEnabled())
+ log.info("starting download for " + fileDescription.getFileName());
+
+ downloadService.startDownload(fileDescription, this);
+
+ synchronized (lock) {
+ lock.wait();
+ if (log.isDebugEnabled())
+ log.info("download " + fileDescription.getFileName() + " is complete");
+ }
+ }
+ }
+
+
+ @Override
+ public void updateDownloadStatus(DownloadService downloadService) {
+ downloadFinised = downloadService.isFinished(fileDescription);
+ if (downloadFinised) {
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
+ }
+}
Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java (rev 0)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/util/SimpleLookUp.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,60 @@
+package org.nuiton.disworkfs.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.nuiton.disworkfs.services.LookUpService;
+import org.nuiton.disworkfs.split.FileDescription;
+
+public class SimpleLookUp implements LookUpObserver {
+
+ /**
+ * only used for synchronisation purpose
+ */
+ protected final Object lock = new Object();
+
+ protected Boolean lookUpResponseReceived = false;
+
+ protected FileDescription fileDescription = null;
+
+ protected LookUpService lookUpService;
+
+ protected String filePath;
+
+ private static final Log log = LogFactory.getLog(SimpleDownload.class);
+
+ public SimpleLookUp(String filePath, LookUpService lookUpService) {
+ this.filePath = filePath;
+ this.lookUpService = lookUpService;
+ }
+
+ /**
+ *
+ * @return the FileDescription or null if file is not found
+ * @throws InterruptedException
+ */
+ public FileDescription runLookUp() throws InterruptedException {
+
+ lookUpService.lookForFileName(filePath, this);
+
+ synchronized (lock) {
+ lock.wait(10 * 1000); // time out at 10 seconds
+ }
+
+ if (lookUpResponseReceived) {
+ log.info("look-up response received for " + filePath);
+ } else {
+ log.info("no look-up response received for " + filePath);
+ }
+
+ return this.fileDescription;
+ }
+
+ @Override
+ public void receiveResult(FileDescription fileDescription) {
+ synchronized (lock) {
+ this.lookUpResponseReceived = true;
+ this.fileDescription = fileDescription;
+ lock.notify();
+ }
+ }
+}
Deleted: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,57 +0,0 @@
-package org.nuiton.disworkfs;
-
-import java.io.File;
-import java.util.Random;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-
-/**
- * This classes is for test purpose. It creates, before all tests
- * a random file with a given size in a temp directory
- */
-public abstract class AbstractSplitFileTest {
-
- static protected Random random = new Random();
-
- /**
- * a place to store files for the test
- * it's a subdirectory of the OS temp dir
- * e.g. under linux /tmp/disworkfs/tests/
- */
- static protected String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests";
-
- /**
- * We will create a file at this path for test purpose
- */
- static protected String randomFilePath = tempDirectoryPath + "/randomfile";
-
- /**
- * The file will have this fixed size
- */
- static protected int randomFileSize = 3500;
-
- @Before
- public void setUp() throws Exception {
- File tempDirectory = new File(tempDirectoryPath);
- tempDirectory.mkdir();
- FileUtils.forceDeleteOnExit(tempDirectory);
-
- // creating random data for the file
- byte[] randomBytes = new byte[randomFileSize];
- random.nextBytes(randomBytes);
-
- // dumping random data into the file
- File randomFile = new File(randomFilePath);
- FileUtils.writeByteArrayToFile(randomFile, randomBytes);
-
- }
-
- @After
- public void tearDown() throws Exception {
- // cleaning
- FileUtils.forceDelete(new File(tempDirectoryPath));
- }
-
-}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -1,6 +1,7 @@
package org.nuiton.disworkfs;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.File;
@@ -95,12 +96,18 @@
DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
DisworkFileSystem disworkFileSystem2 = new DisworkFileSystem(disworkConfig2);
- disworkFileSystem1.write("monfichier", new File(randomFilePath));
- disworkFileSystem2.read("monfichier");
+ disworkFileSystem1.write("mon/chemin/vers/mon/fichier", new File(randomFilePath));
+ boolean existsResult = disworkFileSystem2.exists("mon/chemin/vers/mon/fichier");
+ assertTrue(existsResult);
+ existsResult = disworkFileSystem2.exists("unautrefichierquinexistepas");
+ assertFalse(existsResult);
+
+ disworkFileSystem2.read("mon/chemin/vers/mon/fichier");
+
// TODO
- File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/monfichier");
- File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/monfichier");
+ File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/mon/chemin/vers/mon/fichier");
+ File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/mon/chemin/vers/mon/fichier");
try {
boolean actualContentEquality = IOUtils.contentEquals(
Added: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java (rev 0)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/AbstractSplitFileTest.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -0,0 +1,57 @@
+package org.nuiton.disworkfs.split;
+
+import java.io.File;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * This classes is for test purpose. It creates, before all tests
+ * a random file with a given size in a temp directory
+ */
+public abstract class AbstractSplitFileTest {
+
+ static protected Random random = new Random();
+
+ /**
+ * a place to store files for the test
+ * it's a subdirectory of the OS temp dir
+ * e.g. under linux /tmp/disworkfs/tests/
+ */
+ static protected String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests";
+
+ /**
+ * We will create a file at this path for test purpose
+ */
+ static protected String randomFilePath = tempDirectoryPath + "/randomfile";
+
+ /**
+ * The file will have this fixed size
+ */
+ static protected int randomFileSize = 3500;
+
+ @Before
+ public void setUp() throws Exception {
+ File tempDirectory = new File(tempDirectoryPath);
+ tempDirectory.mkdir();
+ FileUtils.forceDeleteOnExit(tempDirectory);
+
+ // creating random data for the file
+ byte[] randomBytes = new byte[randomFileSize];
+ random.nextBytes(randomBytes);
+
+ // dumping random data into the file
+ File randomFile = new File(randomFilePath);
+ FileUtils.writeByteArrayToFile(randomFile, randomBytes);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // cleaning
+ FileUtils.forceDelete(new File(tempDirectoryPath));
+ }
+
+}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -10,7 +10,6 @@
import org.apache.commons.io.IOUtils;
import org.junit.Test;
-import org.nuiton.disworkfs.AbstractSplitFileTest;
public class SplitFileFromChunksTest extends AbstractSplitFileTest {
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 13:42:54 UTC (rev 25)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 16:00:05 UTC (rev 26)
@@ -9,7 +9,6 @@
import java.util.List;
import org.junit.Test;
-import org.nuiton.disworkfs.AbstractSplitFileTest;
public class SplitFileFromLocalFileTest extends AbstractSplitFileTest {
1
0
r25 - in trunk/diswork-fs/src: main/java/org/nuiton/disworkfs main/java/org/nuiton/disworkfs/services main/java/org/nuiton/disworkfs/split main/java/org/nuiton/disworkfs/transport main/java/org/nuiton/disworkfs/transport/jgroups main/resources test/java/org/nuiton/disworkfs test/java/org/nuiton/disworkfs/split
by bleny@users.nuiton.org 06 May '10
by bleny@users.nuiton.org 06 May '10
06 May '10
Author: bleny
Date: 2010-05-06 15:42:54 +0200 (Thu, 06 May 2010)
New Revision: 25
Url: http://nuiton.org/repositories/revision/diswork/25
Log:
merge SplitBytes/SplitFile synchro des thread, quitter le systeme proprement
Removed:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitedFile.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplittedBytes.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AllwaysReplyToLookUpTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitedFileTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplittedBytesTest.java
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java
trunk/diswork-fs/src/main/resources/log4j.properties
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -2,7 +2,6 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -15,34 +14,19 @@
public class DisworkFileSystem {
- private DownloadService downloadService;
- private UploadService uploadService;
- private LookUpService lookUpService;
- private DisworkConfig disworkConfig;
+ protected DownloadService downloadService;
+ protected UploadService uploadService;
+ protected LookUpService lookUpService;
+ protected DisworkConfig disworkConfig;
+ protected DisworkServicesManager disworkServicesManager;
private static final Log log = LogFactory.getLog(DisworkFileSystem.class);
public DisworkFileSystem(DisworkConfig disworkConfig) {
this.disworkConfig = disworkConfig;
-
- // System.getProperty("user.home")
- /*
- String localIp;
- try {
- localIp = InetAddress.getLocalHost().getHostAddress();
- log.info("local IP is " + localIp);
- } catch (UnknownHostException e) {
- log.error("can't get local IP");
- }
- */
- // System.setProperty("jgroups.bind_addr", localIp);
- // System.setProperty("jgroups.tcpping.initial_hosts", localIp);
- // System.setProperty("jgroups.udp.mcast_addr", "224.0.0.150");
- // System.setProperty("java.net.preferIPv4Stack", "true");
-
- DisworkServicesManager disworkServicesManager = new DisworkServicesManager(disworkConfig);
+ disworkServicesManager = new DisworkServicesManager(disworkConfig);
uploadService = new UploadService();
disworkServicesManager.register(uploadService);
downloadService = new DownloadService();
@@ -72,27 +56,21 @@
}
- OutputStream os = null;
- try {
- os = new FileOutputStream(file);
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- return os;
+ return null;
}
public void write(String path, File source) throws IOException {
File target = new File(disworkConfig.getStoragePath(), path);
- /*
- if (file.exists()) {
- // TODO
- throw new Exception("fichier existe deja");
- }
- */
+ if (target.exists()) {
+ throw new IOException(target.getAbsolutePath() + " already exists");
+ }
+
FileUtil.copy(source, target);
}
+
+ public void close() {
+ disworkServicesManager.stop();
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -56,4 +56,15 @@
service.receiveMessage(message);
}
}
+
+ /**
+ * this stop all the diswork services and close the
+ * ressources used by the transport layer
+ */
+ public void stop() {
+ for (AbstractDisworkService service : services) {
+ service.stop();
+ }
+ transport.close();
+ }
}
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,7 +1,5 @@
package org.nuiton.disworkfs;
-import java.io.File;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.disworkfs.services.DownloadService;
@@ -53,7 +51,7 @@
}
- public File startDownload() throws InterruptedException {
+ public void startDownload() throws InterruptedException {
if (log.isDebugEnabled())
log.info("starting download for " + fileDescription.getFileName());
@@ -64,9 +62,6 @@
if (log.isDebugEnabled())
log.info("download " + fileDescription.getFileName() + " is complete");
}
-
- // FIXME may not return a good file
- return new File(fileDescription.getFileName());
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -61,6 +61,8 @@
@Override
public void run() {}
+
+ public void stop() {}
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -35,7 +35,7 @@
log.info("received file chunk "
+ fileTransferMessage.getFileDescrition().getFileName()
+ " chunk number "
- + fileTransferMessage.getFileChunk().getBytesChunk().getChunkNumber()
+ + fileTransferMessage.getFileChunk().getChunkNumber()
+ ")");
}
@@ -70,7 +70,6 @@
// maybe the download is complete
if (downloadingFile.isComplete()) {
- // write file to the FS
log.info("file " + newFile.getAbsolutePath() + " written");
finishedDownloads.add(checkSum);
@@ -115,10 +114,7 @@
Message message = transport.newMulticastMessage();
message.setContent(new FileRequestMessage(fileDescription));
message.send();
- /*
- Message message = new Message(null, null, new FileRequestMessage(fileDescription));
- jChannel.send(message);
- */
+
if (log.isDebugEnabled())
log.info("sending file request for " + fileDescription.getFileName());
registerObserver(fileDescription, downloadObserver);
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/UploadService.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -53,7 +53,7 @@
reply.setContent(fileTransferMessage);
if (log.isDebugEnabled())
- log.info("sending chunk " + fileTransferMessage.getFileChunk().getBytesChunk().getChunkNumber());
+ log.info("sending chunk " + fileTransferMessage.getFileChunk().getChunkNumber());
reply.send();
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,49 +0,0 @@
-package org.nuiton.disworkfs.split;
-
-import java.io.Serializable;
-
-public class BytesChunk implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 2562740291339559768L;
-
- // FIXME dangerous : not final !
- public static int MAX_CHUNK_SIZE = 1024;
-
- private byte[] data = new byte[MAX_CHUNK_SIZE];
-
- private int chunkNumber;
-
- private int chunkSize;
-
- public BytesChunk(byte[] data) {
- this.setData(data);
- }
-
- public byte[] getData() {
- return this.data;
- }
-
- public void setData(byte[] data) {
- this.data = data;
- }
-
- public int getChunkNumber() {
- return this.chunkNumber;
- }
-
- public void setChunkNumber(int chunkNumber) {
- this.chunkNumber = chunkNumber;
- }
-
- public int getChunkSize() {
- return this.chunkSize;
- }
-
- public void setChunkSize(int chunkSize) {
- this.chunkSize = chunkSize;
- }
-
-}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileChunk.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -4,18 +4,54 @@
public class FileChunk implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 7668123461277672243L;
- private BytesChunk bytesChunk;
+ private static final long serialVersionUID = 9081850894548724600L;
- public FileChunk(BytesChunk bytesChunk) {
- this.bytesChunk = bytesChunk;
+ // FIXME dangerous : not final !
+ public static int MAX_CHUNK_SIZE = 1024;
+
+ private byte[] data = new byte[MAX_CHUNK_SIZE];
+
+ private int chunkNumber;
+
+ private int chunkSize;
+
+ public FileChunk(byte[] data) {
+ this.setData(data);
}
- public BytesChunk getBytesChunk() {
- return bytesChunk;
+ public byte[] getData() {
+ return this.data;
}
-
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public int getChunkNumber() {
+ return this.chunkNumber;
+ }
+
+ public void setChunkNumber(int chunkNumber) {
+ this.chunkNumber = chunkNumber;
+ }
+
+ public int getChunkSize() {
+ return this.chunkSize;
+ }
+
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ public static int numberOfChunksNeededtoStore(long numberOfBytes) {
+ long numberOfChunks = numberOfBytes / MAX_CHUNK_SIZE;
+
+ if (numberOfBytes > numberOfChunks * MAX_CHUNK_SIZE) {
+ numberOfChunks += 1;
+ }
+
+ // FIXME unsafe cast
+ return (int) numberOfChunks;
+ }
+
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/FileDescription.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -34,7 +34,7 @@
public int getNumberOfChunks() {
- return SplittedBytes.numberOfChunksNeededtoStore(this.totalSize);
+ return FileChunk.numberOfChunksNeededtoStore(this.totalSize);
}
public void setFileName(String fileName) {
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -84,9 +84,9 @@
public void addChunk(FileChunk fileChunk) {
try {
RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw");
- byte[] data = fileChunk.getBytesChunk().getData();
- int off = BytesChunk.MAX_CHUNK_SIZE * fileChunk.getBytesChunk().getChunkNumber();
- int len = fileChunk.getBytesChunk().getChunkSize();
+ byte[] data = fileChunk.getData();
+ int off = FileChunk.MAX_CHUNK_SIZE * fileChunk.getChunkNumber();
+ int len = fileChunk.getChunkSize();
randomAccessFile.seek(off);
randomAccessFile.write(data, 0, len);
randomAccessFile.close();
@@ -94,7 +94,7 @@
// updating status
BitSet chunkStatus = readChunkStatusFile();
- chunkStatus.clear(fileChunk.getBytesChunk().getChunkNumber());
+ chunkStatus.clear(fileChunk.getChunkNumber());
writeChunkStatusFile(chunkStatus);
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -33,7 +33,7 @@
List<FileChunk> result = new ArrayList<FileChunk>();
// this array will contains some bytes read from the file
- byte[] read = new byte[BytesChunk.MAX_CHUNK_SIZE];
+ byte[] read = new byte[FileChunk.MAX_CHUNK_SIZE];
// chunks have to be numbered
int chunkNumber = 0;
@@ -45,16 +45,15 @@
// reading the file until the end
while ((chunkSize = randomAccessFile.read(read)) != -1) {
- // creating a FileChunk from the data read
- BytesChunk bytesChunk = new BytesChunk(read);
- bytesChunk.setChunkNumber(chunkNumber);
- bytesChunk.setChunkSize(chunkSize);
+ // creating a FileChunk from the data read
+ FileChunk fileChunk = new FileChunk(read);
+ fileChunk.setChunkNumber(chunkNumber);
+ fileChunk.setChunkSize(chunkSize);
- FileChunk fileChunk = new FileChunk(bytesChunk);
result.add(fileChunk);
// preparing data for next iteration
- read = new byte[BytesChunk.MAX_CHUNK_SIZE];
+ read = new byte[FileChunk.MAX_CHUNK_SIZE];
chunkNumber += 1;
}
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitedFile.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitedFile.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitedFile.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,3 +0,0 @@
-package org.nuiton.disworkfs.split;
-
-
Deleted: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplittedBytes.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplittedBytes.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplittedBytes.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,167 +0,0 @@
-package org.nuiton.disworkfs.split;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This class permit a great array of bytes (a file ?) to be split in multiple
- * chunks. This class offers methods to split and unsplit an array of bytes.
- *
- * There is two use for this class :
- * <ul>
- * <li>Get a collection of chunks, given a single big array of bytes</li>
- * <li>Get a single big array of bytes, given all the chunks</li>
- * </ul>
- *
- * In the first case, you should use the SplittedBytes() constructor. Then, use
- * the setChunksFromBytes method. Finally, use getChunks() to get all the
- * generated chunks.
- *
- * In the second case, you can re-construct, step by step, adding chunks by
- * chunks. First, use SplittedBytes(int expectedSize) to construct by providing
- * the size of the array. Then, add chunks by chunks using addChunk. You don't
- * need to provide the chunks in a particular order. It's useful if you're
- * obtaining your chunks in an unknown order. You just have to provide all the
- * chunks (until isComplete is true). Finally, get the final full array with
- * getBytesFromChunks().
- *
- * If you are missing some chunks, you can know who of them is missing with
- * missingChinks()
- *
- * @author bleny
- */
-
-
-public class SplittedBytes {
-
- private List<BytesChunk> chunks;
- private long totalSize;
-
- public static int numberOfChunksNeededtoStore(long numberOfBytes) {
- long longNumberOfChunks = numberOfBytes / BytesChunk.MAX_CHUNK_SIZE;
-
- // FIXME
- int numberOfChunks = (int) longNumberOfChunks;
-
- if (numberOfBytes > numberOfChunks * BytesChunk.MAX_CHUNK_SIZE) {
- numberOfChunks += 1;
- }
- return numberOfChunks;
- }
-
- /**
- * Use this constructor if you're building a byte-array from chunks. You
- * have to know what will be the exact size of final array : this is needed
- * to make isComplete working The argument is a number of bytes, not a
- * number of chunks !
- *
- * @param expectedSize
- * the size of the array of bytes you will get
- */
- public SplittedBytes(long expectedSize) {
- this.totalSize = expectedSize;
-
- // create a list with one place for each chunks
- List<BytesChunk> emptyChunkList = new ArrayList<BytesChunk>(
- numberOfChunksNeededtoStore(expectedSize));
- for (int chunkNumber = 0; chunkNumber < numberOfChunksNeededtoStore(expectedSize); ++chunkNumber) {
- emptyChunkList.add(null);
- }
- this.chunks = emptyChunkList;
- }
-
- public SplittedBytes() {
- this.chunks = new ArrayList<BytesChunk>();
- }
-
- public long getTotalSize() {
- return this.totalSize;
- }
-
- public void setChunksFromBytes(byte[] data) {
- int numberOfChunks = data.length / BytesChunk.MAX_CHUNK_SIZE;
- if (data.length > numberOfChunks * BytesChunk.MAX_CHUNK_SIZE) {
- numberOfChunks += 1;
- }
-
- for (int currentChunk = 0; currentChunk < numberOfChunks; ++currentChunk) {
-
- int numberOfLastBytes = BytesChunk.MAX_CHUNK_SIZE;
-
- if (data.length - (currentChunk * BytesChunk.MAX_CHUNK_SIZE) < BytesChunk.MAX_CHUNK_SIZE) {
- // last chunk
- numberOfLastBytes = data.length - currentChunk
- * BytesChunk.MAX_CHUNK_SIZE;
- }
-
- // reading numberOfLastBytes bytes
-
- byte[] chunkData = new byte[BytesChunk.MAX_CHUNK_SIZE];
-
- System.arraycopy(data, currentChunk * BytesChunk.MAX_CHUNK_SIZE,
- chunkData, 0, numberOfLastBytes);
-
- BytesChunk bytesChunk = new BytesChunk(chunkData);
- bytesChunk.setChunkNumber(currentChunk);
- bytesChunk.setChunkSize(numberOfLastBytes);
- this.chunks.add(bytesChunk);
- totalSize += numberOfLastBytes;
-
- }
- }
-
- public List<BytesChunk> getChunks() {
- return this.chunks;
- }
-
- public byte[] getBytesFromChunks() throws Exception {
-
- if (!this.isComplete()) {
- // TODO use a specific exception ?
- throw new Exception("Data are incomplete : chunks are missing");
- }
-
- byte[] data = new byte[(int) totalSize];
- int bytesReads = 0;
- totalSize = 0;
- for (BytesChunk fileChunk : this.getChunks()) {
- System.arraycopy(fileChunk.getData(), 0, data, bytesReads,
- fileChunk.getChunkSize());
-
- bytesReads += fileChunk.getChunkSize();
- totalSize += bytesReads;
- }
-
- return data;
- }
-
- public void addChunk(BytesChunk chunk) {
- this.getChunks().set(chunk.getChunkNumber(), chunk);
- }
-
- public boolean isComplete() {
- return totalSize == this.getActualSize();
- }
-
- public int getActualSize() {
- int result = 0;
- for (BytesChunk bytesChunk : chunks) {
- if (bytesChunk != null) {
- result += bytesChunk.getChunkSize();
- }
- }
- return result;
- }
-
- public List<Integer> missingChunks() {
- List<Integer> missingChunks = new ArrayList<Integer>();
- for (int chunkNumber = 0; chunkNumber < this.getChunks().size(); ++chunkNumber) {
- if (this.getChunks().get(chunkNumber) == null) {
- missingChunks.add(chunkNumber);
- }
- }
- return missingChunks;
- }
-
-}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Transport.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -39,5 +39,11 @@
* @return an address you can use in Message.setDestination()
*/
public Address getMulticastAddress();
+
+ /**
+ * this call should release all the ressources used by the transport layer
+ * i.e. close sockets, connections etc.
+ */
+ public void close();
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -74,6 +74,21 @@
String clusterName = disworkConfig.getJGroupsClusterName();
+ /*
+ String localIp;
+ try {
+ localIp = InetAddress.getLocalHost().getHostAddress();
+ log.info("local IP is " + localIp);
+ } catch (UnknownHostException e) {
+ log.error("can't get local IP");
+ }
+ */
+ // System.setProperty("jgroups.bind_addr", localIp);
+ // System.setProperty("jgroups.tcpping.initial_hosts", localIp);
+ // System.setProperty("jgroups.udp.mcast_addr", "224.0.0.150");
+ // System.setProperty("java.net.preferIPv4Stack", "true");
+
+
log.info("connecting to JGroup " + clusterName);
jChannel.connect(clusterName);
@@ -136,5 +151,13 @@
public Address getLocalAddress() {
return new JGroupsAddress(jChannel.getAddress());
}
+
+ @Override
+ public void close() {
+ // leaving group
+ jChannel.disconnect();
+ // closing the channel
+ jChannel.close();
+ }
}
Modified: trunk/diswork-fs/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-06 13:42:54 UTC (rev 25)
@@ -6,6 +6,6 @@
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
# package level
log4j.logger.org.nuiton.disworkfs=DEBUG
-log4j.logger.org.nuiton.disworkfs.transport=WARN
-log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN
-log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN
+#log4j.logger.org.nuiton.disworkfs.transport=WARN
+#log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN
+#log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN
Deleted: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AllwaysReplyToLookUpTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AllwaysReplyToLookUpTest.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AllwaysReplyToLookUpTest.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,34 +0,0 @@
-package org.nuiton.disworkfs;
-
-import static org.mockito.Mockito.mock;
-
-import org.jgroups.JChannel;
-import org.junit.Test;
-import org.mockito.Mock;
-
-public class AllwaysReplyToLookUpTest extends AbstractSplitFileTest {
-
- @Mock JChannel mockedJChannel = mock(JChannel.class);
-
-
- @Test
- public void testReceive() {
- // FIXME can't mock Message because getObject() is final
- /*
-
- AllwaysReplyToLookUp allwaysReplyToLookUp = new AllwaysReplyToLookUp(mockedJChannel);
-
- Message mockedMessage = mock(Message.class);
- // when(mockedMessage.getObject()).thenReturn(new LookUpMessage("randomFilePath"));
-
-
- allwaysReplyToLookUp.receive(mockedMessage);
-
-
- // verify(mockedJChannel, times(4)).send(mockedMessage);
-
- // when(message.getObject()).thenReturn(new LookUpMessage("randomFilePath"));
-
- */
- }
-}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,9 +1,15 @@
package org.nuiton.disworkfs;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.Random;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -31,8 +37,8 @@
static protected String storagePath1;
static protected String storagePath2;
- static protected DisworkFileSystem disworkFileSystem1;
- static protected DisworkFileSystem disworkFileSystem2;
+ static protected DisworkConfig disworkConfig1;
+ static protected DisworkConfig disworkConfig2;
@Before
public void setUp() throws Exception {
@@ -57,13 +63,11 @@
FileUtils.writeByteArrayToFile(randomFile, randomBytes);
- DisworkConfig disworkConfig1 = new DisworkConfig();
+ disworkConfig1 = new DisworkConfig();
disworkConfig1.setOption("storage", storagePath1);
- disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
- DisworkConfig disworkConfig2 = new DisworkConfig();
+ disworkConfig2 = new DisworkConfig();
disworkConfig2.setOption("storage", storagePath2);
- disworkFileSystem2 = new DisworkFileSystem(disworkConfig2);
}
@After
@@ -71,21 +75,46 @@
// cleaning
FileUtil.deleteRecursively(tempDirectoryPath);
}
-/*
+
@Test
public void testWrite() throws Exception {
+ DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
disworkFileSystem1.write("monfichier", new File(randomFilePath));
-
+ disworkFileSystem1.close();
}
-*/
+
+ /**
+ * this test run two DistributedFS. A file is written on the first
+ * when. We try to read from the other FS : since it doesn't own the file,
+ * it will try a lookup and download the file.
+ */
@Test
public void testRead() throws Exception {
+ DisworkFileSystem disworkFileSystem1 = new DisworkFileSystem(disworkConfig1);
+ DisworkFileSystem disworkFileSystem2 = new DisworkFileSystem(disworkConfig2);
+
disworkFileSystem1.write("monfichier", new File(randomFilePath));
-
disworkFileSystem2.read("monfichier");
+ // TODO
+ File monfichierstorage1 = new File(tempDirectoryPath + "/storage1/monfichier");
+ File monfichierstorage2 = new File(tempDirectoryPath + "/storage2/monfichier");
+
+ try {
+ boolean actualContentEquality = IOUtils.contentEquals(
+ new FileInputStream(monfichierstorage1),
+ new FileInputStream(monfichierstorage2)
+ );
+ assertTrue("file and copy content should be the same", actualContentEquality);
+ } catch (IOException e) {
+ fail("one or both files are not readable");
+ e.printStackTrace();
+ }
+
+ disworkFileSystem1.close();
+ disworkFileSystem2.close();
}
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -23,17 +23,17 @@
FileDescription fileDescription = splitFileFromLocalFile.getFileDescription();
assertTrue(fileDescription.getTotalSize() == randomFileSize);
assertTrue(fileDescription.getFileCheckSum() != 0);
- assertEquals(SplittedBytes.numberOfChunksNeededtoStore(randomFileSize), fileDescription.getNumberOfChunks());
+ assertEquals(FileChunk.numberOfChunksNeededtoStore(randomFileSize), fileDescription.getNumberOfChunks());
assertEquals(fileDescription.getFileName(), randomFilePath);
List<FileChunk> allChunks = splitFileFromLocalFile.getAllChunks();
for (FileChunk fileChunk : allChunks) {
- int chunkSize = fileChunk.getBytesChunk().getChunkSize();
+ int chunkSize = fileChunk.getChunkSize();
assertTrue(
- chunkSize == BytesChunk.MAX_CHUNK_SIZE
- || chunkSize == randomFileSize % BytesChunk.MAX_CHUNK_SIZE);
+ chunkSize == FileChunk.MAX_CHUNK_SIZE
+ || chunkSize == randomFileSize % FileChunk.MAX_CHUNK_SIZE);
}
} catch (IOException e) {
Deleted: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitedFileTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitedFileTest.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitedFileTest.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,115 +0,0 @@
-/*package org.nuiton.disworkfs.split;
-
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.nuiton.disworkfs.split.SplitedFile;
-
-public class SplitedFileTest {
-
- static Random random = new Random();
-
- /**
- * a place to store files for the test
- * it's a subdirectory of the OS temp dir
- * e.g. under linux /tmp/disworkfs/tests/
-
- static String tempDirectoryPath = System.getProperty("java.io.tmpdir", ".") + "/disworkfs/tests";
-
- *//**
- * We will create a file at this path for test purpose
- *//*
- static String randomFilePath = tempDirectoryPath + "/randomfile";
-
- *//**
- * The file will have this fixed size
- *//*
- static int randomFileSize = 1 * 1000 * 1000; // 1 MB
-
- @Before
- public void setUp() throws Exception {
- File tempDirectory = new File(tempDirectoryPath);
- tempDirectory.mkdir();
-
- // creating random data for the file
- byte[] randomBytes = new byte[randomFileSize];
- random.nextBytes(randomBytes);
-
- // dumping random data into the file
- File randomFile = new File(randomFilePath);
- FileUtils.writeByteArrayToFile(randomFile, randomBytes);
- }
-
- @Test
- public void testReadFromLocalFileSytem() {
- try {
- SplitedFile splitedFile = new SplitedFile(randomFilePath);
- splitedFile.readFromLocalFileSytem();
- } catch (IOException e) {
- fail();
- e.printStackTrace();
- }
- }
-
- @Test
- public void simpleCopy() {
-
- String splitedFileCopyPath = randomFilePath + "_copy";
- try {
- // let's take a file
- SplitedFile splitedFile = new SplitedFile(randomFilePath);
- splitedFile.readFromLocalFileSytem();
-
-
- // let's create a second file
- SplitedFile splitedFileCopy = new SplitedFile(splitedFileCopyPath);
-
- // here is the simple copy from fist file to second file
- splitedFileCopy.setFileChunks(splitedFile.getFileChunks());
-
- // write the copy to the FS
- splitedFileCopy.writeToLocalFileSystem();
-
- // compare the original and the copy byte by byte
- try {
- boolean actualContentEquality = IOUtils.contentEquals(
- new FileInputStream(new File(randomFilePath)),
- new FileInputStream(new File(splitedFileCopyPath))
- );
- assertTrue("file and copy content should be the same", actualContentEquality);
- } catch (IOException e) {
- fail("one or both files are not readable");
- e.printStackTrace();
- }
-
- // delete the copy
- new File(splitedFileCopyPath).delete();
- } catch (IOException e) {
- fail();
- e.printStackTrace();
- } catch (Exception e) {
- fail();
- e.printStackTrace();
- }
- }
-
- @After
- public void tearDown() throws Exception {
- // cleaning
- new File(randomFilePath).delete();
- new File(tempDirectoryPath).delete();
- }
-
-}
-*/
\ No newline at end of file
Deleted: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplittedBytesTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplittedBytesTest.java 2010-05-06 09:48:34 UTC (rev 24)
+++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplittedBytesTest.java 2010-05-06 13:42:54 UTC (rev 25)
@@ -1,117 +0,0 @@
-package org.nuiton.disworkfs.split;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class SplittedBytesTest {
- static final byte[] rawData = { 1, 2, 3, 4, 5, 6, 7, 8 };
-
- static SplittedBytes splittedBytes = new SplittedBytes();
-
- @Before
- public void setUp() throws Exception {
- BytesChunk.MAX_CHUNK_SIZE = 3;
- }
-
-
- /**
- * this test show that bytes to chunks and chunks
- * to bytes conversions are reflexive
- * @throws Exception
- */
- @Test
- public void testSplitUnSplit() throws Exception {
- splittedBytes.setChunksFromBytes(rawData);
-
- assertEquals(8, splittedBytes.getTotalSize());
- assertEquals(3, SplittedBytes.numberOfChunksNeededtoStore(rawData.length));
-
- byte[] dataCopy = splittedBytes.getBytesFromChunks();
-
- assertTrue("data after splitting and unplitting are not similar", Arrays.equals(rawData, dataCopy));
-
- }
-
- /**
- * This test shows that bytes can be split and
- * unsplit by another instance of SplittedBytes
- * @throws Exception
- */
- @Test
- public void testSplittedBytes2() throws Exception {
-
- SplittedBytes splittedBytes = new SplittedBytes();
- splittedBytes.setChunksFromBytes(rawData);
- assertEquals(8, splittedBytes.getTotalSize());
- assertEquals(3, SplittedBytes.numberOfChunksNeededtoStore(rawData.length));
-
-
-
- SplittedBytes otherSplittedBytes = new SplittedBytes(rawData.length);
- for (BytesChunk bytesChunk : splittedBytes.getChunks()) {
- assertFalse(otherSplittedBytes.isComplete());
-
- int beforeAddActualSize = otherSplittedBytes.getActualSize();
-
- otherSplittedBytes.addChunk(bytesChunk);
-
- int afterAddActualSize = otherSplittedBytes.getActualSize();
-
- assertTrue("actualSize should progress", beforeAddActualSize < afterAddActualSize);
-
- }
-
- assertTrue(splittedBytes.isComplete());
-
- byte[] dataCopy = splittedBytes.getBytesFromChunks();
- assertTrue("data after splitting and unplitting are not similar", Arrays.equals(rawData, dataCopy));
-
- }
-
- /**
- * This test shows that bytes can be split and
- * unsplit by another instance of SplittedBytes
- * @throws Exception
- */
- @Test
- public void testShuffleAdds() throws Exception {
-
- SplittedBytes splittedBytes = new SplittedBytes();
- splittedBytes.setChunksFromBytes(rawData);
-
- SplittedBytes otherSplittedBytes = new SplittedBytes(rawData.length);
-
- List<BytesChunk> someChunks = new ArrayList<BytesChunk>();
-
-
- // TODO use Collections.copy(someChunks, splittedBytes.getChunks());
- for (BytesChunk bytesChunk : splittedBytes.getChunks()) {
- someChunks.add(bytesChunk);
- }
-
- Collections.shuffle(someChunks);
-
- for (BytesChunk bytesChunk : someChunks) {
- assertFalse(otherSplittedBytes.isComplete());
- otherSplittedBytes.addChunk(bytesChunk);
- }
- assertTrue(splittedBytes.isComplete());
-
- byte[] dataCopy = splittedBytes.getBytesFromChunks();
- assertTrue("data after splitting and unplitting are not similar", Arrays.equals(rawData, dataCopy));
-
- }
-
-
-
-}
1
0
r24 - in trunk/diswork-fs/src/main: java/org/nuiton/disworkfs resources
by bleny@users.nuiton.org 06 May '10
by bleny@users.nuiton.org 06 May '10
06 May '10
Author: bleny
Date: 2010-05-06 11:48:34 +0200 (Thu, 06 May 2010)
New Revision: 24
Url: http://nuiton.org/repositories/revision/diswork/24
Log:
implementation du time-out propre
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
trunk/diswork-fs/src/main/resources/log4j.properties
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 08:25:12 UTC (rev 23)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 09:48:34 UTC (rev 24)
@@ -52,7 +52,7 @@
}
- public OutputStream read(String path) {
+ public OutputStream read(String path) throws InterruptedException, FileNotFoundException {
log.info("trying to read " + path);
@@ -63,15 +63,12 @@
// the file is not available
// let's download it
- try {
- SimpleDownload simpleDownload = new SimpleDownload(path, lookUpService, downloadService);
- boolean fileFound = simpleDownload.initiateDownload();
- if (fileFound)
- simpleDownload.startDownload();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ SimpleDownload simpleDownload = new SimpleDownload(path, lookUpService, downloadService);
+ boolean fileFound = simpleDownload.initiateDownload();
+ if (fileFound)
+ simpleDownload.startDownload();
+ else
+ throw new FileNotFoundException("no look-up response received");
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 08:25:12 UTC (rev 23)
+++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 09:48:34 UTC (rev 24)
@@ -12,83 +12,81 @@
public class SimpleDownload implements DownloadObserver, LookUpObserver {
+ /**
+ * only used for synchronisation purpose
+ */
+ protected final Object lock = new Object();
+
+ private Boolean lookUpResponseReceived = false;
private Boolean downloadFinised = false;
+
private FileDescription fileDescription = null;
+
private DownloadService downloadService;
private LookUpService lookUpService;
private String filePath;
private static final Log log = LogFactory.getLog(SimpleDownload.class);
- // TODO timeout
- public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) throws Exception {
+ public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) {
this.filePath = filePath;
this.downloadService = downloadService;
this.lookUpService = lookUpService;
}
- public boolean initiateDownload() {
+ public boolean initiateDownload() throws InterruptedException {
lookUpService.lookForFileName(filePath, this);
- // FIXME bad implementation of a timeout
- int numberOfSecondsWaited = 0;
-
- boolean responseReceived = false;
-
- // wait until there is a result or timeout
- while (!responseReceived && numberOfSecondsWaited <= 10) {
- // response not yet received, wait again...
- try {
- Thread.sleep(1000);
- log.info("waiting for look-up response");
- numberOfSecondsWaited += 1;
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- responseReceived = this.fileDescription != null;
- if (responseReceived)
- log.info("look-up response received");
- }
+
+ synchronized (lock) {
+ lock.wait(10 * 1000); // time out at 10 seconds
+ }
+
+ // TODO throw file not found if timeout exceed
+ if (lookUpResponseReceived) {
+ log.info("look-up response received for " + filePath);
+ } else {
+ log.info("no look-up response received for " + filePath);
+ }
- return responseReceived;
+ return lookUpResponseReceived;
}
- public File startDownload() {
+ public File startDownload() throws InterruptedException {
if (log.isDebugEnabled())
log.info("starting download for " + fileDescription.getFileName());
downloadService.startDownload(fileDescription, this);
- // TODO throw file not found if timeout exceed
- while(! downloadFinised) {
- try {
- Thread.sleep(10 * 1000);
- log.info("still waiting for " + fileDescription.getFileName() + " download to complete");
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ synchronized (lock) {
+ lock.wait();
+ if (log.isDebugEnabled())
+ log.info("download " + fileDescription.getFileName() + " is complete");
+ }
- if (log.isDebugEnabled())
- log.info("download " + fileDescription.getFileName() + " is complete");
+ // FIXME may not return a good file
return new File(fileDescription.getFileName());
}
@Override
public void updateDownloadStatus(DownloadService downloadService) {
- synchronized (downloadFinised) {
- downloadFinised = downloadService.isFinished(fileDescription);
- }
+ downloadFinised = downloadService.isFinished(fileDescription);
+ if (downloadFinised) {
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
}
@Override
public void receiveResult(FileDescription fileDescription) {
- this.fileDescription = fileDescription;
+ synchronized (lock) {
+ lookUpResponseReceived = true;
+ this.fileDescription = fileDescription;
+ lock.notify();
+ }
}
}
Modified: trunk/diswork-fs/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-06 08:25:12 UTC (rev 23)
+++ trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-06 09:48:34 UTC (rev 24)
@@ -7,5 +7,5 @@
# package level
log4j.logger.org.nuiton.disworkfs=DEBUG
log4j.logger.org.nuiton.disworkfs.transport=WARN
-#log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN
-#log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN
+log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN
+log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN
1
0