数据源管理 | 基于DataX组件,同步数据和源码分析 (2)

启动读任务

public static class Task extends Reader.Task { @Override public void startRead(RecordSender recordSender) { int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE); this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender, super.getTaskPluginCollector(), fetchSize); } }

读取任务启动之后,执行读取数据操作。

核心类:CommonRdbmsReader

public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) { ResultSet rs = null; try { // 数据读取 rs = DBUtil.query(conn, querySql, fetchSize); queryPerfRecord.end(); ResultSetMetaData metaData = rs.getMetaData(); columnNumber = metaData.getColumnCount(); PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL); allResultPerfRecord.start(); long rsNextUsedTime = 0; long lastTime = System.nanoTime(); // 数据传输至交换区 while (rs.next()) { rsNextUsedTime += (System.nanoTime() - lastTime); this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector); lastTime = System.nanoTime(); } allResultPerfRecord.end(rsNextUsedTime); }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally { DBUtil.closeDBResources(null, conn); } } 2、数据传输

核心接口:RecordSender(发送)

public interface RecordSender { public Record createRecord(); public void sendToWriter(Record record); public void flush(); public void terminate(); public void shutdown(); }

核心接口:RecordReceiver(接收)

public interface RecordReceiver { public Record getFromReader(); public void shutdown(); }

核心类:BufferedRecordExchanger

class BufferedRecordExchanger implements RecordSender, RecordReceiver 3、写入数据

核心入口:PostgresqlWriter

启动写任务

public static class Task extends Writer.Task { public void startWrite(RecordReceiver recordReceiver) { this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); } }

写数据任务启动之后,执行数据写入操作。

核心类:CommonRdbmsWriter

public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection) { // 写数据库的SQL语句 calcWriteRecordSql(); List<Record> writeBuffer = new ArrayList<>(this.batchSize); int bufferBytes = 0; try { Record record; while ((record = recordReceiver.getFromReader()) != null) { writeBuffer.add(record); bufferBytes += record.getMemorySize(); if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } if (!writeBuffer.isEmpty()) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } catch (Exception e) { throw DataXException.asDataXException( DBUtilErrorCode.WRITE_DATA_ERROR, e); } finally { writeBuffer.clear(); bufferBytes = 0; DBUtil.closeDBResources(null, null, connection); } } 五、源代码地址 GitHub·地址 https://github.com/cicadasmile/data-manage-parent GitEE·地址 https://gitee.com/cicadasmile/data-manage-parent

数据源管理 | 基于DataX组件,同步数据和源码分析

推荐系列阅读

序号 标题
A01   数据源管理:主从库动态路由,AOP模式读写分离  
A02   数据源管理:基于JDBC模式,适配和管理动态数据源  
A03   数据源管理:动态权限校验,表结构和数据迁移流程  
A04   数据源管理:关系型分库分表,列式库分布式计算  
A05   数据源管理:PostGreSQL环境整合,JSON类型应用  
C01   架构基础:单服务.集群.分布式,基本区别和联系  
C02   架构设计:分布式业务系统中,全局ID生成策略  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzdd.html