diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index ce14f822e4..61fbf1504b 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -153,6 +153,8 @@ typedef struct STableDataCxt { SBoundColInfo boundColsInfo; SArray *pValues; SSubmitTbData *pData; + TSKEY lastTs; + bool ordered; } STableDataCxt; typedef struct SVgroupDataCxt { @@ -161,6 +163,7 @@ typedef struct SVgroupDataCxt { } SVgroupDataCxt; int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); +void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 03068ba83e..3e631b197f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1175,6 +1175,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC if (TSDB_CODE_SUCCESS == code) { SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS == code) { + insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); + } } if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index a138b684d1..a271165d5d 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -981,6 +981,20 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { return TSDB_CODE_SUCCESS; } +void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) { + // once the data block is disordered, we do NOT keep previous timestamp any more + if (!pTableCxt->ordered) { + return; + } + + if (tsKey <= pTableCxt->lastTs) { + pTableCxt->ordered = false; + } + + pTableCxt->lastTs = tsKey; + return; +} + static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) { @@ -991,6 +1005,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat int32_t code = TSDB_CODE_SUCCESS; + pTableCxt->ordered = true; + pTableCxt->lastTs = 0; + pTableCxt->pMeta = tableMetaDup(pTableMeta); if (NULL == pTableCxt->pMeta) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1179,7 +1196,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { void* p = taosHashIterate(pTableHash, NULL); while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; - code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + if (pTableCxt->ordered) { + code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + } if (TSDB_CODE_SUCCESS == code) { SVgroupDataCxt* pVgCxt = NULL; int32_t vgId = pTableCxt->pMeta->vgId;