From 0f5983ad887789050b2bca3fa5c0816161871284 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 20 Jul 2022 20:30:54 +0800 Subject: [PATCH] fix: merge dup rows in client --- include/common/tmsg.h | 2 +- source/common/src/tmsg.c | 6 +- source/common/src/trow.c | 14 +- source/common/test/dataformatTest.cpp | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/parser/src/parInsert.c | 6 +- source/libs/parser/src/parInsertData.c | 219 ++++++++++++++++++++++++- 7 files changed, 235 insertions(+), 16 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3e27bd9268..92b2bd187e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -438,7 +438,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW return 0; } -STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); +STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8611278550..3e733d291b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4941,14 +4941,14 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { return 0; } -STSchema *tdGetSTSChemaFromSSChema(SSchema **pSchema, int32_t nCols) { +STSchema *tdGetSTSChemaFromSSChema(SSchema *pSchema, int32_t nCols, int32_t sver) { STSchemaBuilder schemaBuilder = {0}; - if (tdInitTSchemaBuilder(&schemaBuilder, 1) < 0) { + if (tdInitTSchemaBuilder(&schemaBuilder, sver) < 0) { return NULL; } for (int i = 0; i < nCols; i++) { - SSchema *schema = *pSchema + i; + SSchema *schema = pSchema + i; if (tdAddColToSchema(&schemaBuilder, schema->type, schema->flags, schema->colId, schema->bytes) < 0) { tdDestroyTSchemaBuilder(&schemaBuilder); return NULL; diff --git a/source/common/src/trow.c b/source/common/src/trow.c index f64250bce6..a8d501bebc 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -568,6 +568,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { int32_t maxVarDataLen = 0; int32_t iColVal = 0; void *varBuf = NULL; + bool isAlloc = false; ASSERT(nColVal > 1); @@ -610,8 +611,11 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { ++iColVal; } - *ppRow = (STSRow *)taosMemoryCalloc( - 1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1)); + if (!(*ppRow)) { + *ppRow = (STSRow *)taosMemoryCalloc( + 1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1)); + isAlloc = true; + } if (!(*ppRow)) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -621,7 +625,9 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { if (maxVarDataLen > 0) { varBuf = taosMemoryMalloc(maxVarDataLen); if (!varBuf) { - taosMemoryFreeClear(*ppRow); + if(isAlloc) { + taosMemoryFreeClear(*ppRow); + } terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1323,7 +1329,7 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV SCellVal cv; SValue value; - ASSERT(iCol > 0); + // ASSERT(iCol > 0); if (TD_IS_TP_ROW(pRow)) { tdSTpRowGetVal(pRow, pTColumn->colId, pTColumn->type, pTSchema->flen, pTColumn->offset, iCol - 1, &cv); diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index d16e35ff07..65f21bee40 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -116,7 +116,7 @@ STSchema *genSTSchema(int16_t nCols) { } STSchema *pResult = NULL; - pResult = tdGetSTSChemaFromSSChema(&pSchema, nCols); + pResult = tdGetSTSChemaFromSSChema(pSchema, nCols, 1); taosMemoryFree(pSchema); return pResult; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 208b5d3fa0..61a8b4d848 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -614,7 +614,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); pTask->tbSink.pTSchema = - tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); + tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1); ASSERT(pTask->tbSink.pTSchema); } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index c6b608ddb4..225453f006 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1200,7 +1200,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, *gotRow = true; #ifdef TD_DEBUG_PRINT_ROW - STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols); + STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); #endif @@ -1970,7 +1970,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in } } #ifdef TD_DEBUG_PRINT_ROW - STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols); + STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); #endif @@ -2055,7 +2055,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu #ifdef TD_DEBUG_PRINT_ROW if (rowEnd) { - STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols); + STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 290c65de12..fa5a3f4cd0 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -12,13 +12,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -// clang-format off +// clang-format on #include "parInsertData.h" #include "catalog.h" #include "parInt.h" #include "parUtil.h" #include "querynodes.h" +#include "tRealloc.h" #define IS_RAW_PAYLOAD(t) \ (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert @@ -34,6 +35,31 @@ typedef struct SBlockKeyInfo { SBlockKeyTuple* pKeyTuple; } SBlockKeyInfo; +typedef struct { + int32_t index; + SArray* rowArray; // array of merged rows(mem allocated by tRealloc) + STSchema* pSchema; +} SBlockRowMerger; + +static void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) { + if (pMerger) { + pMerger->index = -1; + taosMemoryFreeClear(pMerger->pSchema); + } +} + +static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) { + if (pMerger) { + int32_t size = taosArrayGetSize(pMerger->rowArray); + for (int32_t i = 0; i < size; ++i) { + tFree(*(void**)taosArrayGet(pMerger->rowArray, i)); + } + taosArrayDestroy(pMerger->rowArray); + + taosMemoryFreeClear(pMerger->pSchema); + } +} + static int32_t rowDataCompar(const void* lhs, const void* rhs) { TSKEY left = *(TSKEY*)lhs; TSKEY right = *(TSKEY*)rhs; @@ -328,7 +354,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { } // data block is disordered, sort it in ascending order -int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { +static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; int16_t nRows = pBlocks->numOfRows; @@ -396,6 +422,187 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey return 0; } +static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) { + if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) { + ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray)); + return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index); + } + return NULL; +} + +static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows, + SBlockRowMerger** pBlkRowMerger, int32_t rowSize) { + ASSERT(nDupRows > 1); + SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1); + ASSERT(pStartKeyTp->skey == pEndKeyTp->skey); + + STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr; + // TODO: optimization if end row is all normal +#if 0 + if(isNormal(pEndRow)) { // set the end row if it is normal and return directly + pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr; + return TSDB_CODE_SUCCESS; + } +#endif + + if (!(*pBlkRowMerger)) { + (*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger)); + if (!(*pBlkRowMerger)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + (*pBlkRowMerger)->index = -1; + if (!(*pBlkRowMerger)->rowArray) { + (*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*)); + if (!(*pBlkRowMerger)->rowArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + } + + if (!(*pBlkRowMerger)->pSchema) { + (*pBlkRowMerger)->pSchema = tdGetSTSChemaFromSSChema( + dataBuf->pTableMeta->schema, dataBuf->pTableMeta->tableInfo.numOfColumns, dataBuf->pTableMeta->sversion); + + if (!(*pBlkRowMerger)->pSchema) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + + void* pDestRow = NULL; + ++((*pBlkRowMerger)->index); + if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) { + void* pAlloc = *(void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index); + if (tRealloc((uint8_t**)&pAlloc, rowSize) != 0) { + return TSDB_CODE_FAILED; + } + pDestRow = pAlloc; + } else { + if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) { + return TSDB_CODE_FAILED; + } + taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow); + } + + // merge rows to pDestRow + STSchema* pSchema = (*pBlkRowMerger)->pSchema; + SArray* pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal)); + for (int32_t i = 0; i < pSchema->numOfCols; ++i) { + SColVal colVal = {0}; + for (int32_t j = 0; j < nDupRows; ++i) { + tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal); + if (!colVal.isNone) { + break; + } + } + taosArrayPush(pArray, &colVal); + } + if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) { + taosArrayDestroy(pArray); + return TSDB_CODE_FAILED; + } + + taosArrayDestroy(pArray); + return TSDB_CODE_SUCCESS; +} + +// data block is disordered, sort it in ascending order, and merge dup rows if exists +static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo, + SBlockRowMerger** ppBlkRowMerger) { + SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; + int16_t nRows = pBlocks->numOfRows; + + // size is less than the total size, since duplicated rows may be removed. + + // allocate memory + size_t nAlloc = nRows * sizeof(SBlockKeyTuple); + if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) { + char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc); + if (tmp == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp; + pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc; + } + memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc); + + tdResetSBlockRowMerger(*ppBlkRowMerger); + + int32_t extendedRowSize = getExtendedRowSize(dataBuf); + SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + char* pBlockData = pBlocks->data + pBlocks->schemaLen; + int n = 0; + while (n < nRows) { + pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData); + pBlkKeyTuple->payloadAddr = pBlockData; + pBlkKeyTuple->index = n; + + // next loop + pBlockData += extendedRowSize; + ++pBlkKeyTuple; + ++n; + } + + if (!dataBuf->ordered) { + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + + taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable); + + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + bool hasDup = false; + int32_t nextPos = 0; + int32_t i = 0; + int32_t j = 1; + + while (j < nRows) { + TSKEY ti = (pBlkKeyTuple + i)->skey; + TSKEY tj = (pBlkKeyTuple + j)->skey; + + if (ti == tj) { + ++j; + continue; + } + + if ((j - i) > 1) { + if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { + return TSDB_CODE_FAILED; + } + (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); + hasDup = true; + i = j; + } else { + if (hasDup) { + memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple)); + } + ++i; + } + + ++nextPos; + ++j; + } + + if ((j - i) > 1) { + ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey); + if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { + return TSDB_CODE_FAILED; + } + (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); + } else if (hasDup) { + memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple)); + } + + dataBuf->ordered = true; + pBlocks->numOfRows = i + 1; + } + + dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; + dataBuf->prevTS = INT64_MIN; + + return TSDB_CODE_SUCCESS; +} + // Erase the empty space reserved for binary data static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, bool isRawPayload) { @@ -464,6 +671,8 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p STableDataBlocks** p = taosHashIterate(pHashObj, NULL); STableDataBlocks* pOneTableBlock = *p; SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock + SBlockRowMerger *pBlkRowMerger = NULL; + while (pOneTableBlock) { SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData; if (pBlocks->numOfRows > 0) { @@ -473,6 +682,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL); if (ret != TSDB_CODE_SUCCESS) { + tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); destroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(blkKeyInfo.pKeyTuple); @@ -490,6 +700,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p if (tmp != NULL) { dataBuf->pData = tmp; } else { // failed to allocate memory, free already allocated memory and return error code + tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); destroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(dataBuf->pData); @@ -501,7 +712,8 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p if (isRawPayload) { sortRemoveDataBlockDupRowsRaw(pOneTableBlock); } else { - if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { + if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { + tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); destroyBlockArrayList(pVnodeDataBlockList); taosMemoryFreeClear(dataBuf->pData); @@ -529,6 +741,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p } // free the table data blocks; + tdFreeSBlockRowMerger(pBlkRowMerger); taosHashCleanup(pVnodeDataBlockHashList); taosMemoryFreeClear(blkKeyInfo.pKeyTuple); *pVgDataBlocks = pVnodeDataBlockList;