insertRows 在填充数据的时候会判断数据类型,如果要处理的数据有非常量,比如有需要依赖其他字段设值、函数等等,这个时候会使用 evalRow 方法进行填充,否则使用 fastEvalRow 进行填充。最后将数据处理好之后会调用 InsertExec 的 exec 方法将数据写入存储引擎中。
func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { ... for i, row := range rows { ... err := e.addRecord(ctx, row) if err != nil { return err } } ... return nil }在 exec 方法中会遍历所有的数据,然后调用 addRecord 方法进行处理。
InsertExec 的 addRecord 方法最终会调用到 TableCommon 的 AddRecord。
func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { txn, err := sctx.Txn(true) if err != nil { return nil, err } ... writeBufs := sessVars.GetWriteStmtBufs() // 获取记录行的key key := t.RecordKey(recordID) // 格式化数据行 writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd) if err != nil { return nil, err } value := writeBufs.RowValBuf // 检测该key在本地缓存中是否存在 var setPresume bool skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck { // 如果是 LazyCheck ,那么只读取本地缓存判断是否存在 if sctx.GetSessionVars().LazyCheckKeyNotExists() { var v []byte //只读取本地缓存判断是否存在 v, err = txn.GetMemBuffer().Get(ctx, key) if err != nil { setPresume = true } if err == nil && len(v) == 0 { err = kv.ErrNotExist } } else { //否则会通过rpc请求tikv从集群中校验数据是否存在 _, err = txn.Get(ctx, key) } if err == nil { handleStr := getDuplicateErrorHandleString(t, recordID, r) return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY") } else if !kv.ErrNotExist.Equal(err) { return recordID, err } } // 将 Key-Value 写到当前事务的缓存中 if setPresume { err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists) } else { err = memBuffer.Set(key, value) } if err != nil { return nil, err } // 构造 Index 数据 h, err := t.addIndices(sctx, recordID, r, txn, createIdxOpts) if err != nil { return h, err } ... return recordID, nil }AddRecord 主要做这么几件事:
获取记录行的key,序列化 value,将 Key-Value 写到当前事务的缓存中;
构造 Index 数据;
TiDB 中存储的数据是全局有序 的,并且数据会以 Key-Value的形式存储在 TiDB 中。
所以 TiDB 对每个表分配一个 TableID,每一个索引都会分配一个 IndexID,每一行分配一个 RowID(如果表有整数型的 Primary Key,那么会用 Primary Key 的值当做 RowID),其中 TableID 在整个集群内唯一,IndexID/RowID 在表内唯一,这些 ID 都是 int64 类型。
每行数据按照如下规则进行编码成 Key-Value pair:
Key: tablePrefix{tableID}_recordPrefixSep{rowID} Value: [col1, col2, col3, col4]那么对应的代码实现则会调用 RecordKey 方法获得一个这样的 Key:
t.indexPrefix = tablecodec.GenTableIndexPrefix(physicalTableID) func (t *TableCommon) RecordKey(h kv.Handle) kv.Key { return tablecodec.EncodeRecordKey(t.recordPrefix, h) }这个 Key 分别由 tableID 与 rowID 构成;
对于 Unique Index 数据,会按照如下规则编码成 Key-Value pair:
Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue Value: rowID对于非Unique Index 数据,可能有多行数据的 ColumnsValue是一样的,所以会按照如下规则编码成 Key-Value pair:
Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue_rowID Value: null对应的 Index 实现则会调用 addIndices 方法,最后调用到 GenIndexKey 生成Key:
func GenIndexKey(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, phyTblID int64, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error) { // 校验是否是唯一键 if idxInfo.Unique { distinct = true // 唯一键是允许 null 值的 for _, cv := range indexedValues { if cv.IsNull() { distinct = false break } } } //如果是字符串,那么需要按字段长度裁切 TruncateIndexValues(tblInfo, idxInfo, indexedValues) // 按 tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue 拼接 key = GetIndexKeyBuf(buf, RecordRowKeyLen+len(indexedValues)*9+9) key = appendTableIndexPrefix(key, phyTblID) key = codec.EncodeInt(key, idxInfo.ID) key, err = codec.EncodeKey(sc, key, indexedValues...) if err != nil { return nil, false, err } if !distinct && h != nil { // 如果是非Unique Index 数据,还需要拼接上 rowID if h.IsInt() { key, err = codec.EncodeKey(sc, key, types.NewDatum(h.IntValue())) } else { key = append(key, h.Encoded()...) } } return }GenIndexKey 这里会按照上面说到的规则进行拼接。