DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 我们使用datax 希望支持oracle 增量导入数据:地址:https://gitee.com/cecotw/DataX 链接:https://pan.baidu.com/s/1mbEvLsDZZNWMYrTTTeYkAw 密码:v97c 删除限制 添加oracle 数据插入类型转换: 增加onMergeIntoDoString方法: 增加getStrings方法:Datax 支持增量 oracle update
datax介绍
支持增量 oracle update
修改 OracleWriter.java
修改WriterUtil.java
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) { boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert") || writeMode.trim().toLowerCase().startsWith("replace") || writeMode.trim().toLowerCase().startsWith("update"); if (!isWriteModeLegal) { throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode)); } // && writeMode.trim().toLowerCase().startsWith("replace") String writeDataSqlTemplate; if (forceUseUpdate || ((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update")) ) { //update只在mysql下使用 writeDataSqlTemplate = new StringBuilder() .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")") .append(onDuplicateKeyUpdateString(columnHolders)) .toString(); } else { if (dataBaseType == DataBaseType.Oracle) { writeDataSqlTemplate = new StringBuilder().append(onMergeIntoDoString(writeMode, columnHolders, valueHolders)).append("INSERT (") .append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); } else { //这里是保护,如果其他错误的使用了update,需要更换为replace if (writeMode.trim().toLowerCase().startsWith("update")) { writeMode = "replace"; } writeDataSqlTemplate = new StringBuilder().append(writeMode) .append(" INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); } } return writeDataSqlTemplate; }
public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) { String[] sArray = getStrings(merge); StringBuilder sb = new StringBuilder(); sb.append("MERGE INTO %s A USING ( SELECT "); boolean first = true; boolean first1 = true; StringBuilder str = new StringBuilder(); StringBuilder update = new StringBuilder(); for (String columnHolder : columnHolders) { if (Arrays.asList(sArray).contains(columnHolder)) { if (!first) { sb.append(","); str.append(" AND "); } else { first = false; } str.append("TMP.").append(columnHolder); sb.append("?"); str.append(" = "); sb.append(" AS "); str.append("A.").append(columnHolder); sb.append(columnHolder); } } for (String columnHolder : columnHolders) { if (!Arrays.asList(sArray).contains(columnHolder)) { if (!first1) { update.append(","); } else { first1 = false; } update.append(columnHolder); update.append(" = "); update.append("?"); } } sb.append(" FROM DUAL ) TMP ON ("); sb.append(str); sb.append(" ) WHEN MATCHED THEN UPDATE SET "); sb.append(update); sb.append(" WHEN NOT MATCHED THEN "); return sb.toString(); }
public static String[] getStrings(String merge) { merge = merge.replace("update", ""); merge = merge.replace("(", ""); merge = merge.replace(")", ""); merge = merge.replace(" ", ""); return merge.split(","); }
修改CommonRdbmsWriter.java
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { this.taskPluginCollector = taskPluginCollector; List<String> columns = new ArrayList<>(); List<String> columnsOne = new ArrayList<>(); List<String> columnsTwo = new ArrayList<>(); if (this.dataBaseType == DataBaseType.Oracle) { String merge = this.writeMode; String[] sArray = WriterUtil.getStrings(merge); int size = this.columns.size(); int i = 0; for (int j = 0; j < size; j++) { if (Arrays.asList(sArray).contains(this.columns.get(j))) { columnsOne.add(this.columns.get(j)); } } for (int j = 0; j < size; j++) { if (!Arrays.asList(sArray).contains(this.columns.get(j))) { columnsTwo.add(this.columns.get(j)); } } for (String column : columnsOne) { columns.add(i, column); i++; } for (String column : columnsTwo) { columns.add(i, column); i++; } } columns.addAll(this.columns); // 用于写入数据的时候的类型根据目的表字段类型转换 this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ",")); // 写数据库的SQL语句 calcWriteRecordSql(); List<Record> writeBuffer = new ArrayList<Record>(this.batchSize); int bufferBytes = 0; try { Record record; while ((record = recordReceiver.getFromReader()) != null) { if (record.getColumnNumber() != this.columnNumber && this.dataBaseType != DataBaseType.Oracle) { // 源头读取字段列数与目的表字段写入列数不相等,直接报错 throw DataXException .asDataXException( DBUtilErrorCode.CONF_ERROR, String.format( "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", record.getColumnNumber(), this.columnNumber)); } writeBuffer.add(record); bufferBytes += record.getMemorySize(); if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } if (!writeBuffer.isEmpty()) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } catch (Exception e) { throw DataXException.asDataXException( DBUtilErrorCode.WRITE_DATA_ERROR, e); } finally { writeBuffer.clear(); bufferBytes = 0; DBUtil.closeDBResources(null, null, connection); } }
protected void doBatchInsert(Connection connection, List<Record> buffer) throws SQLException { PreparedStatement preparedStatement = null; try { connection.setAutoCommit(false); preparedStatement = connection .prepareStatement(this.writeRecordSql); if (this.dataBaseType == DataBaseType.Oracle) { String merge = this.writeMode; String[] sArray = WriterUtil.getStrings(merge); for (Record record : buffer) { List<Column> recordOne = new ArrayList<>(); for (int j = 0; j < this.columns.size(); j++) { if (Arrays.asList(sArray).contains(this.columns.get(j))) { recordOne.add(record.getColumn(j)); } } for (int j = 0; j < this.columns.size(); j++) { if (!Arrays.asList(sArray).contains(this.columns.get(j))) { recordOne.add(record.getColumn(j)); } } for (int j = 0; j < this.columns.size(); j++) { recordOne.add(record.getColumn(j)); } for (int j = 0; j < recordOne.size(); j++) { record.setColumn(j, recordOne.get(j)); } preparedStatement = fillPreparedStatement( preparedStatement, record); preparedStatement.addBatch(); } } else { for (Record record : buffer) { preparedStatement = fillPreparedStatement( preparedStatement, record); preparedStatement.addBatch(); } } preparedStatement.executeBatch(); connection.commit(); } catch (SQLException e) { LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage()); connection.rollback(); doOneInsert(connection, buffer); } catch (Exception e) { throw DataXException.asDataXException( DBUtilErrorCode.WRITE_DATA_ERROR, e); } finally { DBUtil.closeDBResources(preparedStatement, null); } }
效果
{ "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "username": "postgres", "password": "postgres", "connection": [ { "querySql":["SELECT seq,userid,name FROM user"], "jdbcUrl": [ "jdbc:postgresql://127.0.0.1:5432/postgres" ] } ] } }, "writer": { "name": "oraclewriter", "parameter": { "username": "oracle", "password": "oracle", "column": [ "seq", "userid", "name" ], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:oracle", "table": [ "user1" ] } ], "writeMode": "update (seq,userid)" } } } ] } }
源码
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算