diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index af124f7a02..cf8e28bf9a 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -557,13 +557,10 @@ void tRowDestroy(SRow *pRow) { } static int32_t tRowPCmprFn(const void *p1, const void *p2) { - if ((*(SRow **)p1)->ts < (*(SRow **)p2)->ts) { - return -1; - } else if ((*(SRow **)p1)->ts > (*(SRow **)p2)->ts) { - return 1; - } - - return 0; + SRowKey key1, key2; + tRowGetKey(*(SRow **)p1, &key1); + tRowGetKey(*(SRow **)p2, &key2); + return tRowKeyCompare(&key1, &key2); } static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); } static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) { @@ -645,13 +642,18 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { int32_t iStart = 0; while (iStart < aRowP->size) { - SRow *pRow = (SRow *)taosArrayGetP(aRowP, iStart); + SRowKey key1; + SRow *row1 = (SRow *)taosArrayGetP(aRowP, iStart); + + tRowGetKey(row1, &key1); int32_t iEnd = iStart + 1; while (iEnd < aRowP->size) { - SRow *pRowT = (SRow *)taosArrayGetP(aRowP, iEnd); + SRowKey key2; + SRow *row2 = (SRow *)taosArrayGetP(aRowP, iEnd); + tRowGetKey(row2, &key2); - if (pRow->ts != pRowT->ts) break; + if (tRowKeyCompare(&key1, &key2) != 0) break; iEnd++; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a03adb164b..a4af782ad7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -32,14 +32,14 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, SRpcMsg *pOriginRpc); static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginalMsg); + SRpcMsg *pOriginalMsg); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginalMsg); + SRpcMsg *pOriginalMsg); static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -1516,7 +1516,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginalMsg) { + SRpcMsg *pOriginalMsg) { int32_t code = 0; terrno = 0; @@ -1586,12 +1586,26 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in } else { int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); + SRowKey lastRowKey; for (int32_t iRow = 0; iRow < nRow; ++iRow) { - if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { + if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); goto _exit; } + if (iRow == 0) { + tRowGetKey(aRow[iRow], &lastRowKey); + } else { + SRowKey rowKey; + tRowGetKey(aRow[iRow], &rowKey); + + if (tRowKeyCompare(&lastRowKey, &rowKey) >= 0) { + code = TSDB_CODE_INVALID_MSG; + vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); + goto _exit; + } + lastRowKey = rowKey; + } } } } @@ -1735,10 +1749,14 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); - if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){ - const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId, - pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId, - pOriginalMsg->info.conn.user, "Success"}; + if (tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0) { + const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, + pVnode->monitor.strClusterId, + pVnode->monitor.strDnodeId, + tsLocalEp, + pVnode->monitor.strVgId, + pOriginalMsg->info.conn.user, + "Success"}; taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels); } @@ -2019,7 +2037,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe } static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginalMsg) { + SRpcMsg *pOriginalMsg) { int32_t code = 0; SDecoder *pCoder = &(SDecoder){0}; SDeleteRes *pRes = &(SDeleteRes){0};