Author: bleny Date: 2010-05-25 18:24:08 +0200 (Tue, 25 May 2010) New Revision: 52 Url: http://nuiton.org/repositories/revision/diswork/52 Log: untested implementation of copy-on-write to deal with concurrency Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/storage/Storage.java Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/storage/Storage.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/storage/Storage.java 2010-05-25 15:24:32 UTC (rev 51) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/storage/Storage.java 2010-05-25 16:24:08 UTC (rev 52) @@ -3,6 +3,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ConcurrentModificationException; import java.util.Map; import org.apache.commons.io.IOUtils; @@ -165,54 +166,91 @@ } /** - * a put in the map, this involves split considerations + * a put in the map, this involves split considerations and concurrency * @param key * @param value * @throws IOException */ protected void put(String key, InputStream value) throws IOException { - - // TODO 20100519 bleny deal with copy-on-write // TODO 20100519 bleny deal with null value properly if (value == null) { value = new ByteArrayInputStream(new byte[0]); } + String lockKey = key + "_lock"; + String newDataKey = key + "_newData"; + + // trying to acquire lock + Long currentDate = System.currentTimeMillis(); + byte[] lock = map.put(lockKey, EntryUtil.stringToBytes(currentDate.toString())); + + if (lock != null) { + // file is locked, check date + Long currentLock = Long.parseLong(EntryUtil.bytesToString(lock)); + + if (currentDate - currentLock > 60 * 60 * 1000) { + // this lock is out dated, let's erase it + String obsoleteMetaBlock = EntryUtil.bytesToString(map.get(newDataKey)); + if (obsoleteMetaBlock != null) { + String[] obsoleteBlocksIds = EntryUtil.getBlockIdsFromMetaBlock(obsoleteMetaBlock); + for (String obsoleteBlockId : obsoleteBlocksIds) { + map.remove(obsoleteBlockId); + } + } + } else { + // file is currently written, stopping + map.put(lockKey, lock); + // FIXME 20100525 bleny should be declared in throws + throw new ConcurrentModificationException("can't write due to concurrency"); + } + } + + // here, we know we can write or an exception + // would have been thrown earlier String blocksIds = ""; int readResult = 0; int totalSize = 0; + String metaBlock = totalSize + blocksIds; + map.put(newDataKey, EntryUtil.stringToBytes(metaBlock)); + // creating a buffer of the size of a block int bufferSize = disworkConfig.getBlockSize(); byte[] buffer = new byte[bufferSize]; - + while ((readResult = value.read(buffer)) != -1) { totalSize += readResult; - + byte[] newBlock = buffer; - + // if the buffer is not full, truncate the block if (readResult < buffer.length) { newBlock = new byte[readResult]; System.arraycopy(buffer, 0, newBlock, 0, readResult); } - + String id = EntryUtil.generateId(); blocksIds += EntryUtil.BLOCKIDS_SEPARATOR + id; log.debug("saving new block (size = " + newBlock.length + ")" + " at key " + id); - + // copy block in map map.put(id, newBlock); + + metaBlock = totalSize + blocksIds; + + log.debug("putting metablock " + metaBlock + " at key " + key); + map.put(newDataKey, EntryUtil.stringToBytes(metaBlock)); + + // updating lock + currentDate = System.currentTimeMillis(); + map.put(lockKey, EntryUtil.stringToBytes(currentDate.toString())); } - String metaBlock = totalSize + blocksIds; - - log.debug("putting metablock " + metaBlock + " at key " + key); - - map.put(key, EntryUtil.stringToBytes(metaBlock)); + map.put(key, map.get(newDataKey)); + map.remove(lockKey); } public void remove(Object key) {
participants (1)
-
bleny@users.nuiton.org