This is an automated email from the git hooks/post-receive script. New commit to branch feature/3857_sql_replication_service in repository topia. See http://git.nuiton.org/topia.git commit af103b01ad2730be5cb7f47f22df9ed2823ef890 Author: Tony CHEMIT <chemit@codelutin.com> Date: Wed Dec 30 14:34:46 2015 +0100 New replication sql service (See #3857) --- .../sql/TopiaSqlReplicationService.java | 29 +++ .../sql/TopiaSqlReplicationServiceImpl.java | 49 +++++ .../sql/action/ReplicateActionBuilderSupport.java | 44 ++++ .../action/ReplicationActionRequestSupport.java | 61 ++++++ .../sql/action/ReplicationActionSupport.java | 169 +++++++++++++++ .../topia/replication/sql/action/package-info.java | 9 + .../sql/action/todb/ReplicateToDbAction.java | 210 +++++++++++++++++++ .../action/todb/ReplicateToDbActionBuilder.java | 35 ++++ .../action/todb/ReplicateToDbActionRequest.java | 25 +++ .../replication/sql/action/todb/package-info.java | 9 + .../sql/action/tosql/ReplicateToSqlAction.java | 199 ++++++++++++++++++ .../action/tosql/ReplicateToSqlActionBuilder.java | 35 ++++ .../action/tosql/ReplicateToSqlActionRequest.java | 26 +++ .../replication/sql/action/tosql/package-info.java | 9 + .../nuiton/topia/replication/sql/package-info.java | 20 ++ .../sql/table/TopiaReplicationTable.java | 79 +++++++ .../sql/table/TopiaReplicationTables.java | 36 ++++ .../sql/table/TopiaReplicationTablesBuilder.java | 227 +++++++++++++++++++++ .../topia/replication/sql/table/package-info.java | 9 + 19 files changed, 1280 insertions(+) diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java new file mode 100644 index 0000000..ff66fc9 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationService.java @@ -0,0 +1,29 @@ +package org.nuiton.topia.replication.sql; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.persistence.TopiaService; +import org.nuiton.topia.replication.sql.action.todb.ReplicateToDbActionBuilder; +import org.nuiton.topia.replication.sql.action.tosql.ReplicateToSqlActionBuilder; + +import java.io.FileNotFoundException; +import java.io.Writer; + +/** + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public interface TopiaSqlReplicationService extends TopiaService { + + /** + * @return a new replicate to db action builder + */ + ReplicateToDbActionBuilder newReplicateToDbActionBuilder(TopiaApplicationContext targetApplicationContext); + + /** + * @return a new replicate to sql action builder + */ + ReplicateToSqlActionBuilder newReplicatetoSqlActionBuilder(Writer writer) throws FileNotFoundException; + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java new file mode 100644 index 0000000..e2fb5ce --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/TopiaSqlReplicationServiceImpl.java @@ -0,0 +1,49 @@ +package org.nuiton.topia.replication.sql; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.todb.ReplicateToDbActionBuilder; +import org.nuiton.topia.replication.sql.action.tosql.ReplicateToSqlActionBuilder; + +import java.io.Writer; +import java.util.Map; + +/** + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class TopiaSqlReplicationServiceImpl implements TopiaSqlReplicationService { + + public static final int DEFAULT_FETCH_SIZE = 1000; + + protected TopiaApplicationContext topiaApplicationContext; + + @Override + public ReplicateToDbActionBuilder newReplicateToDbActionBuilder(TopiaApplicationContext targetApplicationContext) { + return new ReplicateToDbActionBuilder() + .from(topiaApplicationContext) + .to(targetApplicationContext) + .setFetchSize(DEFAULT_FETCH_SIZE); + } + + @Override + public ReplicateToSqlActionBuilder newReplicatetoSqlActionBuilder(Writer writer) { + return new ReplicateToSqlActionBuilder() + .from(topiaApplicationContext) + .to(writer) + .setFetchSize(DEFAULT_FETCH_SIZE); + } + + @Override + public void initTopiaService(TopiaApplicationContext topiaApplicationContext, Map<String, String> serviceConfiguration) { + this.topiaApplicationContext = topiaApplicationContext; + //TODO Add parameters (default fetchSize) + } + + @Override + public void close() { + + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java new file mode 100644 index 0000000..ccdb246 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicateActionBuilderSupport.java @@ -0,0 +1,44 @@ +package org.nuiton.topia.replication.sql.action; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +/** + * Support to create action builder. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicateActionBuilderSupport<B extends ReplicateActionBuilderSupport, R extends ReplicationActionRequestSupport, A extends ReplicationActionSupport<R>> { + + protected final R replicationRequest; + + public abstract A build(); + + public ReplicateActionBuilderSupport(R replicationRequest) { + this.replicationRequest = replicationRequest; + } + + public B from(TopiaApplicationContext sourceTopiaApplicationContext) { + replicationRequest.setSourceTopiaApplicationContext(sourceTopiaApplicationContext); + return (B) this; + } + + public B setFetchSize(int fetchSize) { + replicationRequest.setFetchSize(fetchSize); + return (B) this; + } + + public B setArg(String arg) { + replicationRequest.setArg(arg); + return (B) this; + } + + public B setTables(TopiaReplicationTables tables) { + replicationRequest.setTables(tables); + return (B) this; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java new file mode 100644 index 0000000..471350c --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionRequestSupport.java @@ -0,0 +1,61 @@ +package org.nuiton.topia.replication.sql.action; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +/** + * Support to create action request. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicationActionRequestSupport { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicationActionRequestSupport.class); + + protected TopiaApplicationContext sourceTopiaApplicationContext; + + protected TopiaReplicationTables tables; + + protected int fetchSize; + + protected String arg; + + public TopiaReplicationTables getTables() { + return tables; + } + + public int getFetchSize() { + return fetchSize; + } + + public String getArg() { + return arg; + } + + public TopiaApplicationContext getSourceTopiaApplicationContext() { + return sourceTopiaApplicationContext; + } + + protected void setTables(TopiaReplicationTables tables) { + this.tables = tables; + } + + protected void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + protected void setArg(String arg) { + this.arg = arg; + } + + protected void setSourceTopiaApplicationContext(TopiaApplicationContext sourceTopiaApplicationContext) { + this.sourceTopiaApplicationContext = sourceTopiaApplicationContext; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java new file mode 100644 index 0000000..35aa731 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/ReplicationActionSupport.java @@ -0,0 +1,169 @@ +package org.nuiton.topia.replication.sql.action; + +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.TopiaPersistenceContext; +import org.nuiton.topia.persistence.internal.AbstractTopiaPersistenceContext; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; +import org.nuiton.util.TimeLog; + +import java.io.Closeable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** + * Support to create action. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public abstract class ReplicationActionSupport<R extends ReplicationActionRequestSupport> implements Runnable, Closeable { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicationActionSupport.class); + + private static final TimeLog TIME_LOG = new TimeLog(ReplicationActionSupport.class, 50L, 100L); + + protected final R request; + + protected TopiaPersistenceContext sourcePersistenceContext; + + protected long startTime; + + protected long endTime; + + protected abstract void executeOnTable(TopiaReplicationTable table); + + protected ReplicationActionSupport(R request) { + this.request = request; + } + + @Override + public final void run() { + + TopiaReplicationTables tables = request.getTables(); + + before(tables); + + for (TopiaReplicationTable table : tables) { + + long startTable = TimeLog.getTime(); + + executeOnTable(table); + + TIME_LOG.log(startTable, "Executed on table.", table.getFullyTableName()); + + } + + TIME_LOG.log(startTime, "All tables executed"); + + after(tables); + + endTime = TIME_LOG.log(startTime, "Replication executed."); + + } + + @Override + public void close() { + + if (sourcePersistenceContext != null) { + sourcePersistenceContext.close(); + } + + } + + public R getRequest() { + return request; + } + + protected PreparedStatement createReadStatement(TopiaReplicationTable table, Connection connection) throws SQLException { + + StringBuilder sqlBuilder = new StringBuilder("SELECT " + table.getTableName() + ".*"); + + sqlBuilder.append(" FROM ").append(table.getFromClause()); + for (String joinClause : table.getJoinClauses()) { + sqlBuilder.append(" ").append(joinClause); + } + if (table.getWhereClause() != null) { + sqlBuilder.append(" WHERE ").append(table.getWhereClause()); + } + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Read sql: " + sql); + } + PreparedStatement statement = connection.prepareStatement(sql); + if (request.getArg() != null) { + statement.setString(1, request.getArg()); + } + statement.setFetchSize(request.getFetchSize()); + return statement; + + } + + protected String createWriteStatement(TopiaReplicationTable table, ResultSetMetaData readResultTatMetaData) throws SQLException { + + int columnCount = readResultTatMetaData.getColumnCount(); + + StringBuilder sqlBuilder = new StringBuilder("INSERT INTO "); + sqlBuilder.append(table.getSchemaName()).append(".").append(table.getTableName()); + + StringBuilder columnNamesBuilder = new StringBuilder(); + + columnNamesBuilder.append(" ").append(readResultTatMetaData.getColumnName(1)); + + for (int i = 2; i <= columnCount; i++) { + + String columnName = readResultTatMetaData.getColumnName(i); + columnNamesBuilder.append(", ").append(columnName); + } + + sqlBuilder.append("(") + .append(columnNamesBuilder.toString().trim()) + .append(")") + .append("VALUES (%s);\n"); + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Write sql: " + sql.trim()); + } + return sql; + + } + + protected void before(TopiaReplicationTables tables) { + startTime = TimeLog.getTime(); + if (log.isDebugEnabled()) { + log.debug("Before with tables: " + Iterables.size(tables)); + } + } + + protected void after(TopiaReplicationTables tables) { + + if (log.isDebugEnabled()) { + log.debug("After with tables: " + Iterables.size(tables)); + } + + TIME_LOG.log(startTime, "All tables executed"); + + } + + protected void executeSqlWork(TopiaSqlWork sqlWork) { + getSourcePersistenceContext().getSqlSupport().doSqlWork(sqlWork); + } + + protected AbstractTopiaPersistenceContext getSourcePersistenceContext() { + if (sourcePersistenceContext == null) { + sourcePersistenceContext = request.getSourceTopiaApplicationContext().newPersistenceContext(); + } + return (AbstractTopiaPersistenceContext) sourcePersistenceContext; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java new file mode 100644 index 0000000..532a6d5 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Paquetage de base pour les actions</h1> + * + * <p>On retrouve ici les classes abstraites de l'API d'action</p> + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java new file mode 100644 index 0000000..da6842b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbAction.java @@ -0,0 +1,210 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.internal.AbstractTopiaPersistenceContext; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.action.ReplicationActionSupport; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Action to replicate some tables to another database. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbAction extends ReplicationActionSupport<ReplicateToDbActionRequest> { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicateToDbAction.class); + + protected AbstractTopiaPersistenceContext targetPersistenceContext; + + public ReplicateToDbAction(ReplicateToDbActionRequest context) { + super(context); + } + + @Override + public void close() { + + try { + super.close(); + } finally { + + if (targetPersistenceContext != null) { + targetPersistenceContext.close(); + } + + } + + } + + public AbstractTopiaPersistenceContext getTargetPersistenceContext() { + if (targetPersistenceContext == null) { + targetPersistenceContext = (AbstractTopiaPersistenceContext) request.getTargetTopiaApplicationContext().newPersistenceContext(); + } + return targetPersistenceContext; + } + + @Override + protected void after(TopiaReplicationTables tables) { + + super.after(tables); + + if (targetPersistenceContext != null) { + targetPersistenceContext.commit(); + } + + } + + @Override + protected void executeOnTable(TopiaReplicationTable table) { + + if (log.isInfoEnabled()) { + log.info("Replicate: " + table.getFullyTableName()); + } + + ReadSqlWork sqlWork = new ReadSqlWork(table); + executeSqlWork(sqlWork); + + } + + protected class ReadSqlWork implements TopiaSqlWork { + + private final TopiaReplicationTable table; + + public ReadSqlWork(TopiaReplicationTable table) { + + this.table = table; + } + + @Override + public void execute(Connection connection) throws SQLException { + + try (PreparedStatement readStatement = createReadStatement(table, connection)) { + + readStatement.execute(); + CopySqlWork sqlWork = new CopySqlWork(table, readStatement); + getTargetPersistenceContext().getSqlSupport().doSqlWork(sqlWork); + + } + + } + + } + + protected class CopySqlWork implements TopiaSqlWork { + + protected final Statement readStatement; + + private final TopiaReplicationTable table; + + public CopySqlWork(TopiaReplicationTable table, Statement readStatement) { + this.table = table; + this.readStatement = readStatement; + } + + @Override + public void execute(Connection connection) throws SQLException { + + ResultSet readResultSet = readStatement.getResultSet(); + + ResultSetMetaData readResultSetMetaData = readResultSet.getMetaData(); + int columnCount = readResultSetMetaData.getColumnCount(); + + int fetchSize = request.getFetchSize(); + String tableName = table.getFullyTableName(); + + try (PreparedStatement writeStatement = createWriteStatement(connection, readResultSetMetaData)) { + + + long index = 0; + while (readResultSet.next()) { + + copyRow(readResultSet, writeStatement, columnCount); + + if ((++index % fetchSize) == 0) { + + flush(writeStatement, tableName, index); + + } + } + + flush(writeStatement, tableName, index); + } + + } + + public PreparedStatement createWriteStatement(Connection connection, ResultSetMetaData readResultTatMetaData) throws SQLException { + + int columnCount = readResultTatMetaData.getColumnCount(); + + StringBuilder sqlBuilder = new StringBuilder("INSERT INTO "); + sqlBuilder.append(table.getSchemaName()).append(".").append(table.getTableName()); + + StringBuilder columnNamesBuilder = new StringBuilder(); + StringBuilder argsBuilder = new StringBuilder(); + + columnNamesBuilder.append(" ").append(readResultTatMetaData.getColumnName(1)); + argsBuilder.append("?"); + + for (int i = 2; i <= columnCount; i++) { + + String columnName = readResultTatMetaData.getColumnName(i); + columnNamesBuilder.append(", ").append(columnName); + argsBuilder.append(", ?"); + } + + sqlBuilder.append("(") + .append(columnNamesBuilder) + .append(")") + .append("VALUES (") + .append(argsBuilder) + .append(")"); + + String sql = sqlBuilder.toString(); + if (log.isDebugEnabled()) { + log.debug("Write sql: " + sql); + } + PreparedStatement statement = connection.prepareStatement(sql); + + return statement; + + } + + protected void copyRow(ResultSet readResultSet, PreparedStatement writeStatement, int columnCount) throws SQLException { + writeStatement.clearParameters(); + + if (log.isTraceEnabled()) { + log.trace("Copy " + readResultSet.getString(1)); + } + for (int i = 1; i <= columnCount; i++) { + Object object = readResultSet.getObject(i); + writeStatement.setObject(i, object); + } + writeStatement.addBatch(); + } + + protected void flush(PreparedStatement writeStatement, String tableName, long index) throws SQLException { + + if (log.isDebugEnabled()) { + log.debug("Flush for : " + tableName + " (size: " + index + ")"); + } + writeStatement.executeBatch(); + writeStatement.clearBatch(); + + } + + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java new file mode 100644 index 0000000..fa5cd96 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionBuilder.java @@ -0,0 +1,35 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import com.google.common.base.Preconditions; +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.ReplicateActionBuilderSupport; + +/** + * Builder of {@link ReplicateToDbAction}. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbActionBuilder extends ReplicateActionBuilderSupport<ReplicateToDbActionBuilder, ReplicateToDbActionRequest, ReplicateToDbAction> { + + public ReplicateToDbActionBuilder() { + super(new ReplicateToDbActionRequest()); + } + + public ReplicateToDbActionBuilder to(TopiaApplicationContext targetTopiaApplicationContext) { + replicationRequest.setTargetTopiaApplicationContext(targetTopiaApplicationContext); + return this; + } + + @Override + public ReplicateToDbAction build() { + Preconditions.checkState(replicationRequest.getSourceTopiaApplicationContext() != null, "No sourceTopiaApplicationContext defined"); + Preconditions.checkState(replicationRequest.getTargetTopiaApplicationContext() != null, "No targetTopiaApplicationContext defined"); + + ReplicateToDbAction action = new ReplicateToDbAction(replicationRequest); + return action; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java new file mode 100644 index 0000000..007120b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/ReplicateToDbActionRequest.java @@ -0,0 +1,25 @@ +package org.nuiton.topia.replication.sql.action.todb; + +import org.nuiton.topia.persistence.TopiaApplicationContext; +import org.nuiton.topia.replication.sql.action.ReplicationActionRequestSupport; + +/** + * Request to perform a {@link ReplicateToDbAction}. + * + * Created on 28/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToDbActionRequest extends ReplicationActionRequestSupport { + + protected TopiaApplicationContext targetTopiaApplicationContext; + + public TopiaApplicationContext getTargetTopiaApplicationContext() { + return targetTopiaApplicationContext; + } + + protected void setTargetTopiaApplicationContext(TopiaApplicationContext targetTopiaApplicationContext) { + this.targetTopiaApplicationContext = targetTopiaApplicationContext; + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java new file mode 100644 index 0000000..023f309 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/todb/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Réplication vers une base de données gérée par ToPIA</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action.todb; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java new file mode 100644 index 0000000..885d9c6 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlAction.java @@ -0,0 +1,199 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.nuiton.topia.persistence.support.TopiaSqlWork; +import org.nuiton.topia.replication.sql.action.ReplicationActionSupport; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTable; +import org.nuiton.topia.replication.sql.table.TopiaReplicationTables; + +import javax.sql.rowset.serial.SerialBlob; +import java.io.IOException; +import java.io.Writer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; + +/** + * Action to replicate some tables into a sql script. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlAction extends ReplicationActionSupport<ReplicateToSqlActionRequest> { + + /** Logger. */ + private static final Log log = LogFactory.getLog(ReplicateToSqlAction.class); + + protected static void flush(Writer writer) { + try { + writer.flush(); + } catch (IOException e) { + throw new RuntimeException("Could not flush writer", e); + } + } + + protected ReplicateToSqlAction(ReplicateToSqlActionRequest request) { + super(request); + } + + @Override + protected void after(TopiaReplicationTables tables) { + + super.after(tables); + + ReplicateToSqlAction.flush(request.getWriter()); + + } + + @Override + protected void executeOnTable(TopiaReplicationTable table) { + + if (log.isInfoEnabled()) { + log.info("Replicate: " + table.getFullyTableName()); + } + + ReadSqlWork sqlWork = new ReadSqlWork(table); + executeSqlWork(sqlWork); + + } + + protected class ReadSqlWork implements TopiaSqlWork { + + private final TopiaReplicationTable table; + + public ReadSqlWork(TopiaReplicationTable table) { + + this.table = table; + } + + @Override + public void execute(Connection connection) throws SQLException { + + try (PreparedStatement readStatement = createReadStatement(table, connection)) { + + readStatement.execute(); + + ResultSet readResultSet = readStatement.getResultSet(); + + ResultSetMetaData readResultSetMetaData = readResultSet.getMetaData(); + int columnCount = readResultSetMetaData.getColumnCount(); + Class[] types = computeTypes(readResultSetMetaData, columnCount); + + String writeStatement = createWriteStatement(table, readResultSetMetaData); + + int fetchSize = request.getFetchSize(); + String tableName = table.getFullyTableName(); + + Writer writer = request.getWriter(); + long index = 0; + while (readResultSet.next()) { + + try { + copyRow(readResultSet, writeStatement, types, writer, columnCount); + } catch (IOException e) { + throw new RuntimeException("Could not copyRow", e); + } + + if ((++index % fetchSize) == 0) { + + flush(writer, tableName, index); + + } + } + + flush(writer, tableName, index); + + } + + } + + protected void copyRow(ResultSet readResultSet, String writeStatement, Class[] types, Writer writer, int columnCount) throws SQLException, IOException { + + if (log.isTraceEnabled()) { + log.trace("Copy " + readResultSet.getString(1)); + } + + String statement = ""; + + for (int i = 1; i <= columnCount; i++) { + Class columnType = types[i - 1]; + if (String.class.equals(columnType)) { + String stringValue = readResultSet.getString(i); + if (stringValue == null) { + statement += ", NULL"; + } else { + statement += ", '" + stringValue + "'"; + } + } else if (Blob.class.equals(columnType)) { + Blob blob = readResultSet.getBlob(i); + if (blob == null) { + statement += ", NULL"; + } else { + + SerialBlob serialBlob = new SerialBlob(blob); + try (ByteArrayOutputStream stringWriter = new ByteArrayOutputStream((int) serialBlob.length())) { + stringWriter.write(serialBlob.getBinaryStream()); + statement += ", '" + new String(stringWriter.toByteArray()) + "'"; + } + + } + } else { + statement += ", " + readResultSet.getObject(i); + } + } + + statement = String.format(writeStatement, statement.substring(2)); + + if (log.isTraceEnabled()) { + log.trace("Write: " + statement.trim()); + } + + try { + writer.append(statement); + } catch (IOException e) { + throw new RuntimeException("Could not append to writer", e); + } + + } + + protected void flush(Writer writer, String tableName, long index) { + + if (log.isDebugEnabled()) { + log.debug("Flush for : " + tableName + " (size: " + index + ")"); + } + + ReplicateToSqlAction.flush(writer); + + } + + protected Class[] computeTypes(ResultSetMetaData readResultSetMetaData, int columnCount) throws SQLException { + Class[] result = new Class[columnCount]; + for (int i = 1; i <= columnCount; i++) { + int columnType = readResultSetMetaData.getColumnType(i); + + switch (columnType) { + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + case Types.LONGNVARCHAR: + result[i - 1] = String.class; + break; + case Types.BLOB: + result[i - 1] = Blob.class; + default: + result[i - 1] = Object.class; + } + } + return result; + } + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java new file mode 100644 index 0000000..1984fae --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionBuilder.java @@ -0,0 +1,35 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import com.google.common.base.Preconditions; +import org.nuiton.topia.replication.sql.action.ReplicateActionBuilderSupport; + +import java.io.Writer; + +/** + * Builder of {@link ReplicateToSqlAction}. + * + * Created on 29/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlActionBuilder extends ReplicateActionBuilderSupport<ReplicateToSqlActionBuilder, ReplicateToSqlActionRequest, ReplicateToSqlAction> { + + public ReplicateToSqlActionBuilder() { + super(new ReplicateToSqlActionRequest()); + } + + public ReplicateToSqlActionBuilder to(Writer writer) { + replicationRequest.setWriter(writer); + return this; + } + + @Override + public ReplicateToSqlAction build() { + Preconditions.checkState(replicationRequest.getSourceTopiaApplicationContext() != null, "No sourceTopiaApplicationContext defined"); + Preconditions.checkState(replicationRequest.getWriter() != null, "No writer defined"); + ReplicateToSqlAction action = new ReplicateToSqlAction(replicationRequest); + return action; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java new file mode 100644 index 0000000..89a6b7b --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/ReplicateToSqlActionRequest.java @@ -0,0 +1,26 @@ +package org.nuiton.topia.replication.sql.action.tosql; + +import org.nuiton.topia.replication.sql.action.ReplicationActionRequestSupport; + +import java.io.Writer; + +/** + * Request to perform a {@link ReplicateToSqlAction}. + * + * Created on 28/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +public class ReplicateToSqlActionRequest extends ReplicationActionRequestSupport { + + protected Writer writer; + + public Writer getWriter() { + return writer; + } + + protected void setWriter(Writer writer) { + this.writer = writer; + } +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java new file mode 100644 index 0000000..5c42a8f --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/action/tosql/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Réplication vers un fichier sql</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.action.tosql; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java new file mode 100644 index 0000000..51904a7 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/package-info.java @@ -0,0 +1,20 @@ +/** + * <h1>Service de réplication performant</h1> + * + * Ce nouveau service remplacera à terme le service de réplication basé sur la réplication hibernate. + * + * Il est beaucoup plus performant en terme de temps d'exécution. + * + * <h2>Les réplications possibles</h2> + * + * <p>On définit deux modes de réplication :</p> + * + * <ul> + * <li>La réplication vers une autre base gérée par ToPIA {@link org.nuiton.topia.replication.sql.action.todb}</li> + * <li>La réplication vers un fichier sql {@link org.nuiton.topia.replication.sql.action.tosql}</li> + * </ul> + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql; \ No newline at end of file diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java new file mode 100644 index 0000000..6a7aa36 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTable.java @@ -0,0 +1,79 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableSet; + +/** + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTable { + + /** + * Table schema name. + */ + protected final String schemaName; + + /** + * Table name. + */ + protected final String tableName; + + /** + * Fully table name (including the schema name). + */ + protected final String fullyTableName; + + /** + * From clause. + */ + protected final String fromClause; + + /** + * Where clause. + */ + protected final String whereClause; + + /** + * Join clauses. + */ + protected final ImmutableSet<String> joinClauses; + + public TopiaReplicationTable(String schemaName, + String tableName, + String fromClause, + String whereClause, + ImmutableSet<String> joinClauses) { + this.schemaName = schemaName.toLowerCase(); + this.tableName = tableName.toLowerCase(); + this.fullyTableName = this.schemaName + "." + this.tableName; + this.fromClause = fromClause; + this.whereClause = whereClause; + this.joinClauses = joinClauses; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + public String getFullyTableName() { + return fullyTableName; + } + + public String getFromClause() { + return fromClause; + } + + public String getWhereClause() { + return whereClause; + } + + public ImmutableSet<String> getJoinClauses() { + return joinClauses; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java new file mode 100644 index 0000000..5d55cff --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTables.java @@ -0,0 +1,36 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import java.util.Iterator; + +/** + * Define a ordered set of {@link TopiaReplicationTable}. + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTables implements Iterable<TopiaReplicationTable> { + + protected final ImmutableMap<String, TopiaReplicationTable> tablesByFullyTableName; + + protected final ImmutableSet<TopiaReplicationTable> orderedTables; + + public TopiaReplicationTables(ImmutableMap<String, TopiaReplicationTable> tablesByFullyTableName, + ImmutableSet<TopiaReplicationTable> orderedTables) { + this.tablesByFullyTableName = tablesByFullyTableName; + this.orderedTables = orderedTables; + } + + public TopiaReplicationTable getTable(String key) { + return tablesByFullyTableName.get(key); + } + + @Override + public Iterator<TopiaReplicationTable> iterator() { + return orderedTables.iterator(); + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java new file mode 100644 index 0000000..827a7f8 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/TopiaReplicationTablesBuilder.java @@ -0,0 +1,227 @@ +package org.nuiton.topia.replication.sql.table; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.nuiton.topia.persistence.TopiaEntityEnum; + +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; + +/** + * To build a {@link TopiaReplicationTables}. + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + */ +public class TopiaReplicationTablesBuilder { + + protected final TreeMap<String, TopiaReplicationTable> tablesByFullyTableName; + + protected final TreeMap<Integer, TopiaReplicationTable> tablesByOrder; + + protected int internalOrder; + + protected final boolean withId; + + public static TopiaReplicationTablesBuilder builder(boolean withId) { + return new TopiaReplicationTablesBuilder(withId); + } + + public TopiaReplicationTables build() { + + List<Integer> orders = Lists.newArrayList(tablesByOrder.keySet()); + Collections.sort(orders); + ImmutableSet.Builder<TopiaReplicationTable> orderedTablesBuilder = ImmutableSet.builder(); + for (Integer order : orders) { + orderedTablesBuilder.add(tablesByOrder.get(order)); + } + + return new TopiaReplicationTables(ImmutableMap.copyOf(tablesByFullyTableName), + orderedTablesBuilder.build()); + } + + public TopiaReplicationTablesBuilder addTopTable(TopiaEntityEnum entityEnum) { + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = tableName + ".topiaid = ?"; + String fromClause = schemaName + "." + tableName + " " + tableName; + + registerTable(schemaName, + tableName, + whereClause, + fromClause, + ImmutableSet.<String>of()); + + return this; + } + + public TopiaReplicationTablesBuilder addTopJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String parentTableName = parentTable.getTableName(); + String whereClause = tableName + "." + parentTableName + " = ?"; + String fromClause = schemaName + "." + tableName + " " + tableName; + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + ImmutableSet.<String>of()); + + } + + public TopiaReplicationTablesBuilder addJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + "." + parentTableName + " = " + parentTableName + ".topiaId"; + + ImmutableSet<String> joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + + } + + public TopiaReplicationTablesBuilder addInversedJoinTable(TopiaEntityEnum parentEntityEnum, TopiaEntityEnum entityEnum) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = entityEnum.dbSchemaName().toLowerCase(); + String tableName = entityEnum.dbTableName().toLowerCase(); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + ".topiaId = " + parentTableName + "." + tableName; + + ImmutableSet<String> joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + return registerTable( + schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + + } + + public TopiaReplicationTablesBuilder addAssociationTable(TopiaEntityEnum parentEntityEnum, String associationName, boolean addInnerJoin) { + + TopiaReplicationTable parentTable = getTable(parentEntityEnum); + + String schemaName = parentEntityEnum.dbSchemaName().toLowerCase(); + String tableName = getAssociationTableName(associationName.toLowerCase(), parentTable.getTableName()); + String whereClause = parentTable.getWhereClause(); + String fromClause = parentTable.getFromClause(); + if (parentTable.getJoinClauses().isEmpty()) { + fromClause = schemaName + "." + tableName; + } + ImmutableSet<String> joinClauses; + + if (addInnerJoin) { + + String parentTableName = parentTable.getTableName(); + String joinClause = " INNER JOIN " + schemaName + "." + tableName + " " + tableName + " ON " + tableName + "." + parentTableName + " = " + parentTableName + ".topiaId"; + joinClauses = addJoinCause(parentTable.getJoinClauses(), joinClause); + + } else { + joinClauses = parentTable.getJoinClauses(); + } + + return registerTable(schemaName, + tableName, + whereClause, + fromClause, + joinClauses); + } + + public TopiaReplicationTablesBuilder orderPlusOne() { + return incrementsOrder(1); + } + + public TopiaReplicationTablesBuilder orderMinusTwo() { + return decrementsOrder(2); + } + + public TopiaReplicationTablesBuilder incrementsOrder(int inc) { + internalOrder += inc; + return this; + } + + public TopiaReplicationTablesBuilder decrementsOrder(int dec) { + internalOrder -= dec; + return this; + } + + protected ImmutableSet<String> addJoinCause(ImmutableSet<String> joinClauses, String joinClause) { + return ImmutableSet + .<String>builder() + .addAll(joinClauses) + .add(joinClause) + .build(); + } + + protected TopiaReplicationTablesBuilder registerTable(String schemaName, + String tableName, + String whereClause, + String fromClause, + ImmutableSet<String> joinClauses) { + TopiaReplicationTable table = new TopiaReplicationTable( + schemaName, + tableName, + fromClause, withId ? whereClause : null, + joinClauses); + + tablesByFullyTableName.put(table.getFullyTableName(), table); + tablesByOrder.put(internalOrder++, table); + return this; + } + + protected TopiaReplicationTable getTable(String key) { + return tablesByFullyTableName.get(key); + } + + protected TopiaReplicationTable getTable(TopiaEntityEnum entityEnum) { + String key = getFullyTableName(entityEnum); + return tablesByFullyTableName.get(key); + } + + protected String getFullyTableName(TopiaEntityEnum entityEnum) { + String fullyTableName = entityEnum.dbSchemaName().toLowerCase() + "." + entityEnum.dbTableName().toLowerCase(); + return fullyTableName; + } + + protected TopiaReplicationTablesBuilder(boolean withId) { + this.withId = withId; + this.tablesByFullyTableName = new TreeMap<>(); + this.tablesByOrder = new TreeMap<>(); + } + + protected String getAssociationTableName(String tableName, String parentTableName) { + String associationTableName; + if (tableName.compareTo(parentTableName) < 0) { + associationTableName = tableName + "_" + parentTableName; + } else { + associationTableName = parentTableName + "_" + tableName; + } + return associationTableName; + } + +} diff --git a/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java new file mode 100644 index 0000000..89e7d08 --- /dev/null +++ b/topia-service-replication/src/main/java/org/nuiton/topia/replication/sql/table/package-info.java @@ -0,0 +1,9 @@ +/** + * <h1>Définition des tables à répliquer</h1> + * + * Created on 30/12/15. + * + * @author Tony Chemit - chemit@codelutin.com + * @since 3.0.1 + */ +package org.nuiton.topia.replication.sql.table; \ No newline at end of file -- To stop receiving notification emails like this one, please contact nuiton.org SCM administrator <admin+scm@nuiton.org>.