之后遍历构造好的 toBeCheckedRows ,先调用 updateDupRow 方法判断主键判断是否有冲突,如果主键没有冲突,那么判断唯一键是否有冲突,都没有冲突则执行正常插入逻辑。
updateDupRow 会判断 Key 值在缓存中是否存在,存在则调用 doDupRowUpdate ;doDupRowUpdate 中会根据 ON DUPLICATE 中的字段更新新的数据行中的值,并将被更新过的字段打上 flag 之后调用 updateRecord 函数。
func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table, onDup bool, memTracker *memory.Tracker) (bool, error) { txn, err := sctx.Txn(false) if err != nil { return false, err } changed, handleChanged := false, false ... for i, col := range t.Cols() { // 这里是新旧数据进行比较,如果相同返回0 cmp, err := newData[i].CompareDatum(sc, &oldData[i]) if err != nil { return false, err } //这里表明新旧数据不同 if cmp != 0 { changed = true //设置标记位,表示有数据被修改 modified[i] = true ... // 如果是主键更改,设置 handleChanged if col.IsPKHandleColumn(t.Meta()) { handleChanged = true if err := rebaseAutoRandomValue(sctx, t, &newData[i], col); err != nil { return false, err } } // 如果是主键更改,设置 handleChanged if col.IsCommonHandleColumn(t.Meta()) { handleChanged = true } // 表示该字段没有被更改 } else { if mysql.HasOnUpdateNowFlag(col.Flag) && modified[i] { onUpdateSpecified[i] = true } modified[i] = false } } // 如果数据行没有变化,直接返回 if !changed { ... return false, nil } // 这里如果是主键被更改了,那么会先将原数据删除,再添加一条新的数据 if handleChanged { if updated, err := func() (bool, error) { txn, err := sctx.Txn(true) if err != nil { return false, err } memBuffer := txn.GetMemBuffer() sh := memBuffer.Staging() defer memBuffer.Cleanup(sh) if err = t.RemoveRecord(sctx, h, oldData); err != nil { return false, err } _, err = t.AddRecord(sctx, newData, table.IsUpdate, table.WithCtx(ctx)) if err != nil { return false, err } memBuffer.Release(sh) return true, nil }(); err != nil { if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue { return false, nil } return updated, err } } else { // 更新记录行 if err = t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil { if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue { return false, nil } return false, err } } ... return true, nil }updateRecord 会判断行数据有没有被更改,如果有被更改,那么分为两种情况:
主键被更改了,那么会先将原数据删除,再添加一条新的数据;
唯一键被更改会调用 UpdateRecord 更新记录行;
func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { txn, err := sctx.Txn(true) if err != nil { return err } memBuffer := txn.GetMemBuffer() ... // 重建索引记录 err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, table.WithCtx(ctx)) if err != nil { return err } // 构建行记录key key := t.RecordKey(h) sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder // 构建行记录value value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, rd) if err != nil { return err } // 将数据添加到事务缓存中 if err = memBuffer.Set(key, value); err != nil { return err } memBuffer.Release(sh) ... return nil }UpdateRecord 中执行的逻辑和 AddRecord 有点类似,首先会调用 rebuildIndices 将旧的索引记录删除,重新构建新的索引;然后根据当前的行记录构建 key-value 添加到事务缓存中。
最后用一张图总结一下这个过程:
总结这篇文章 debug 用了蛮长时间的,想要弄清楚其中的逻辑非常不容易,但是还有一些地方没弄明白,如在执行 ON DUPLICATE会更新数据行,那么数据一致性怎么保证的?这些疑问我想到时候留给事务章节去弄明白。
Referencehttps://pingcap.com/zh/blog/tidb-source-code-reading-4
https://pingcap.com/zh/blog/tidb-source-code-reading-16