diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2389079fd2..a3cae6a7db 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -767,6 +767,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674) #define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675) +#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE TAOS_DEF_ERROR_CODE(0, 0x2676) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 69b2a2e6a3..fd56b5f5ae 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1331,6 +1331,13 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } + if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock); + uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock); + taosMemoryFreeClear(pBlock->info.pks[0].pData); + taosMemoryFreeClear(pBlock->info.pks[1].pData); + } + blockDataFreeRes(pBlock); taosMemoryFreeClear(pBlock); return NULL; @@ -1478,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; @@ -1497,10 +1505,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); + pVal->nData = pDataBlock->info.pks[0].nData; + memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData); - pVal = &pBlock->info.pks[1]; - pVal->type = pDataBlock->info.pks[1].type; - pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + SValue* p = &pBlock->info.pks[1]; + p->type = pDataBlock->info.pks[1].type; + p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + p->nData = pDataBlock->info.pks[1].nData; + memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); + uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); + uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); + } else { + uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f13a0a0825..9686059052 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -101,16 +101,18 @@ typedef struct { int32_t kvRowSize; } SRowBuildScanInfo; -static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE; sinfo->numOfNone++; + return 0; } -static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; sinfo->numOfNull++; sinfo->kvMaxOffset = sinfo->kvPayloadSize; sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); + return 0; } static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) { @@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal } static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) { + int32_t code = 0; int32_t colValIndex = 1; int32_t numOfColVals = TARRAY_SIZE(colVals); SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals); @@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS for (int32_t i = 1; i < schema->numOfCols; i++) { for (;;) { if (colValIndex >= numOfColVals) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } @@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { - tRowBuildScanAddNull(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit; } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; } colValIndex++; break; } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } else { // skip useless value colValIndex++; @@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS + sinfo->kvIndexSize // index array + sinfo->kvPayloadSize; // payload - return 0; +_exit: + return code; } static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2b99c6f6ef..b060de029c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -622,8 +622,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat } SRow* pRow = NULL; - tqInfo("result column flag:%d", pTSchema->columns[1].flags); - code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index dd5da28b6b..d5f3624851 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; - // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = TSKEY_MIN; @@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } } - // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it + // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it p->hasResult = true; varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); } + for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (idx < funcTypeBlockArray->size) { @@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, if (IS_VAR_DATA_TYPE(pPkCol->type)) { p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); } + + p->pkColumn = *pPkCol; } if (numOfTables == 0) { @@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - for (int32_t j = 0; j < pr->numOfCols; ++j) { - int32_t bytes; - if (slotIds[j] == -1) { - bytes = 1; - } else { - bytes = pr->pSchema->columns[slotIds[j]].bytes; - } + int32_t pkBufLen = 0; + if (pr->rowKey.numOfPKs > 0) { + pkBufLen = pr->pkColumn.bytes; + } - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e8b1f870c3..9feae4c57e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, return pLoadInfo; } +static void freeItem(void* pValue) { + SValue* p = (SValue*) pValue; + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFree(p->pData); + } +} + void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { return NULL; @@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo->info.pCount != NULL) { taosArrayDestroy(pLoadInfo->info.pUid); - taosArrayDestroy(pLoadInfo->info.pFirstKey); - taosArrayDestroy(pLoadInfo->info.pLastKey); + taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem); + taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem); taosArrayDestroy(pLoadInfo->info.pCount); taosArrayDestroy(pLoadInfo->info.pFirstTs); taosArrayDestroy(pLoadInfo->info.pLastTs); @@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } +static int32_t tValueDupPayload(SValue *pVal) { + if (IS_VAR_DATA_TYPE(pVal->type)) { + char *p = (char *)pVal->pData; + char *pBuf = taosMemoryMalloc(pVal->nData); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(pBuf, p, pVal->nData); + pVal->pData = (uint8_t *)pBuf; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); @@ -377,37 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); - SValue vFirst = {0}, vLast = {0}; - for (int32_t f = i; f < rows; ++f) { - int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); - if (code) { - break; - } + if (block.numOfPKs > 0) { + SValue vFirst = {0}, vLast = {0}; + for (int32_t f = i; f < rows; ++f) { + int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); + if (code) { + break; + } - if (IS_VAR_DATA_TYPE(vFirst.type)) { - char *p = (char *)vFirst.pData; - char *pBuf = taosMemoryMalloc(vFirst.nData); - memcpy(pBuf, p, vFirst.nData); - vFirst.pData = (uint8_t *)pBuf; - } - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + tValueDupPayload(&vFirst); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); - code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); - if (code) { - break; - } + // todo add api to clone the original data + code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); + if (code) { + break; + } - if (IS_VAR_DATA_TYPE(vLast.type)) { - char *p = (char *)vLast.pData; - char *pBuf = taosMemoryMalloc(vLast.nData); - memcpy(pBuf, p, vLast.nData); - vLast.pData = (uint8_t *)pBuf; + tValueDupPayload(&vLast); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } - taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } } else { - STbStatisRecord record; + STbStatisRecord record = {0}; + while (i < rows) { tStatisBlockGet(&block, i, &record); if (record.suid != suid) { @@ -420,8 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]); + if (record.firstKey.numOfPKs > 0) { + SValue s = record.firstKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); + + s = record.lastKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); + } + i += 1; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a8a4ced517..b7f97771da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange); + SVersionRange* pVerRange, bool hasPk); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); @@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader); +static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, @@ -120,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { return ret > 0 ? 1 : -1; } } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + return p1->pks[0].val - p2->pks[0].val; } } } @@ -174,12 +173,16 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { for (int32_t i = 0; i < pRow->numOfPKs; i++) { pKey->pks[i].type = indices[i].type; + uint8_t *tdata = data + indices[i].offset; + if (pRow->flag >> 4) { + tdata += tGetI16v(tdata, NULL); + } + if (IS_VAR_DATA_TYPE(indices[i].type)) { - tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); - pKey->pks[i].pData += pKey->pks[i].nData; + tdata += tGetU32v(tdata, &pKey->pks[i].nData); + memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); } else { - pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); + memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } } } @@ -392,19 +395,21 @@ _err: return code; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { + return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); +} + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - taosArrayClear(pIter->blockList); + clearDataBlockIterator(pIter, needFree); } } -static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } - static void initReaderStatus(SReaderStatus* pStatus) { pStatus->pTableIter = NULL; pStatus->loadFromFile = true; @@ -483,7 +488,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) { } static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, - SQueryTableDataCond* pCond) { + SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; terrno = 0; @@ -491,6 +496,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit if (pResBlockInfo->pResBlock == NULL) { pResBlockInfo->freeBlock = true; pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + + if (pSup->numOfPks > 0) { + SSDataBlock* p = pResBlockInfo->pResBlock; + p->info.pks[0].type = pSup->pk.type; + p->info.pks[1].type = pSup->pk.type; + + if (IS_VAR_DATA_TYPE(pSup->pk.type)) { + p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[0].pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[1].pData == NULL) { + taosMemoryFreeClear(p->info.pks[0].pData); + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[0].nData = pSup->pk.bytes; + p->info.pks[1].nData = pSup->pk.bytes; + } + } } else { pResBlockInfo->freeBlock = false; } @@ -523,14 +550,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->type = pCond->type; - + pReader->bFilesetDelimited = false; pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } - if (pCond->numOfCols <= 0) { tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); code = TSDB_CODE_INVALID_PARA; @@ -546,6 +568,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); } + code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -567,8 +594,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - pReader->bFilesetDelimited = false; - tsdbInitReaderLock(pReader); tsem_init(&pReader->resumeAfterSuspend, 0, 0); @@ -657,21 +682,19 @@ _end: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, - SArray* pTableScanInfoList) { - size_t sizeInDisk = 0; - int64_t st = taosGetTimestampUs(); +static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, + SArray* pTableScanInfoList) { + int32_t k = 0; + size_t sizeInDisk = 0; + int64_t st = taosGetTimestampUs(); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STimeWindow w = pReader->info.window; + SBrinRecord* pRecord = NULL; + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + SBrinRecordIter iter = {0}; // clear info for the new file cleanupInfoForNextFileset(pReader->status.pTableMap); - - int32_t k = 0; - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - STimeWindow w = pReader->info.window; - SBrinRecord* pRecord = NULL; - - SBrinRecordIter iter = {0}; initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { @@ -743,14 +766,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); + if (pScanInfo->pBlockList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); + if (pScanInfo->pBlockIdxList == NULL) { + pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx)); + if (pScanInfo->pBlockIdxList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)}; + recordToBlockInfo(&blockInfo, pRecord); + void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo); if (p1 == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + // todo: refactor to record the fileset skey/ekey if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } @@ -788,18 +824,13 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN return TSDB_CODE_SUCCESS; } -// todo keep the the last returned key static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { -// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; pDumpInfo->allDumped = true; -// ASSERT(0); -// pDumpInfo->lastKey.key.ts = maxKey + step; } static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, bool asc) { pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = numOfPks; if (pKey->numOfPKs <= 0) { return; @@ -809,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; } else { uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; - pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; + pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); } } @@ -1323,10 +1354,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p } static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, + STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { - bool asc = ASCENDING_TRAVERSE(order); - if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { + bool asc = ASCENDING_TRAVERSE(order); + int32_t step = asc ? 1 : -1; + + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) { return false; } @@ -1334,9 +1367,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return false; } - int32_t step = asc ? 1 : -1; - STableDataBlockIdx* pTableDataBlockIdx = - taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); blockInfoToRecord(pRecord, p); @@ -1344,22 +1375,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return true; } -static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { - int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; - int32_t index = pBlockIter->index; - - while (index < pBlockIter->numOfBlocks && index >= 0) { - SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); - if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { - return index; - } - - index += step; - } - - return -1; -} - static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { if (index < 0 || index >= pBlockIter->numOfBlocks) { @@ -1595,7 +1610,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc); if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { - if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, + pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; return true; } @@ -2135,7 +2151,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order, - &pInfo->verRange); + &pInfo->verRange, pReader->suppInfo.numOfPks > 0); if (dropped) { return false; } @@ -2705,7 +2721,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { - code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); + code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -2820,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; } else { - memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); - memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); + memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); - pInfo->pks[0].nData = pBlockInfo->firstPKLen; - pInfo->pks[1].nData = pBlockInfo->lastPKLen; + pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); + pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); } } @@ -3153,23 +3169,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; if (pBlockInfo) { - // todo handle -// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); -// if (pScanInfo) { -// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey); -// lastKey = pScanInfo->lastProcKey; -// } - pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; -// pDumpInfo->lastKey.key.ts = lastKey; } pDumpInfo->allDumped = false; -// pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -3194,7 +3201,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); } @@ -3304,7 +3311,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -3381,8 +3388,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } +static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) { + size_t num = taosArrayGetSize(pDelList); + int32_t start = index; + + if (asc) { + if (start >= num - 1) { + start = num - 1; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts >= key && start > 0) { + start -= 1; + } + } else { + if (index <= 0) { + start = 0; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts <= key && start < num - 1) { + start += 1; + } + } + + return start; +} + bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange) { + SVersionRange* pVerRange, bool hasPk) { if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) { return false; } @@ -3391,6 +3425,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t bool asc = ASCENDING_TRAVERSE(order); int32_t step = asc ? 1 : -1; + if (hasPk) { // handle the case where duplicated timestamps existed. + *index = reverseSearchStartPos(pDelList, *index, key, asc); + } + if (asc) { if (*index >= num - 1) { TSDBKEY* last = taosArrayGetLast(pDelList); @@ -3503,7 +3541,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } @@ -3528,7 +3566,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } @@ -4118,7 +4156,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4322,7 +4360,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter); + cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo)); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -4338,9 +4376,11 @@ void tsdbReaderClose2(STsdbReader* pReader) { SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pSttBlockReader != NULL) { - SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; - tMergeTreeClose(&pLReader->mergeTree); - taosMemoryFree(pLReader); + SSttBlockReader* pSttBlockReader = pFilesetIter->pSttBlockReader; + tMergeTreeClose(&pSttBlockReader->mergeTree); + + clearRowKey(&pSttBlockReader->currentKey); + taosMemoryFree(pSttBlockReader); } destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); @@ -4996,7 +5036,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 2a7b0140df..ae8a6466ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in return TSDB_CODE_SUCCESS; } +void clearRowKey(SRowKey* pKey) { + if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { + return; + } + taosMemoryFreeClear(pKey->pks[0].pData); +} + static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData); + + clearRowKey(&p->lastProcKey); + clearRowKey(&p->sttRange.skey); + clearRowKey(&p->sttRange.ekey); + clearRowKey(&p->sttKeyInfo.nextProcKey); } void destroyAllBlockScanInfo(SSHashObj* pTableMap) { @@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -434,27 +446,55 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; + } else { + char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData); + varDataSetLen(p, pFirstKey->pks[0].nData); + pBlockInfo->firstPk.pData = (uint8_t*)p; - pBlockInfo->firstPKLen = 0; - pBlockInfo->lastPKLen = 0; - } else { // todo handle memory alloc error, opt memory alloc perf - pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; - pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); - memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); - - pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; - pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); - memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + int32_t keyLen = record->lastKey.key.pks[0].nData; + p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen); + varDataSetLen(p, keyLen); + pBlockInfo->lastPk.pData = (uint8_t*)p; } } } +static void freePkItem(void* pItem) { + SFileDataBlockInfo* p = pItem; + taosMemoryFreeClear(p->firstPk.pData); + taosMemoryFreeClear(p->lastPk.pData); +} + +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) { + pIter->index = -1; + pIter->numOfBlocks = 0; + + if (needFree) { + taosArrayClearEx(pIter->blockList, freePkItem); + } else { + taosArrayClear(pIter->blockList); + } +} + +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) { + pIter->index = -1; + pIter->numOfBlocks = 0; + if (needFree) { + taosArrayDestroyEx(pIter->blockList, freePkItem); + } else { + taosArrayDestroy(pIter->blockList); + } +} + int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; + clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo)); + pBlockIter->numOfBlocks = numOfBlocks; - taosArrayClear(pBlockIter->blockList); // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); @@ -482,9 +522,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; for (int32_t k = 0; k < num; ++k) { - SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k); sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo}; cnt++; } @@ -499,20 +539,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); - if (pTableScanInfo->pBlockIdxList == NULL) { - pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx)); - } - for (int32_t i = 0; i < numOfBlocks; ++i) { - SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - recordToBlockInfo(&blockInfo, record, pReader); - - taosArrayPush(pBlockIter->blockList, &blockInfo); STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); } + taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); int64_t et = taosGetTimestampUs(); @@ -540,18 +572,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t index = sup.indexPerTable[pos]++; - SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - recordToBlockInfo(&blockInfo, record, pReader); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); + taosArrayPush(pBlockIter->blockList, pBlockInfo); - taosArrayPush(pBlockIter->blockList, &blockInfo); STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; - if (pTableScanInfo->pBlockIdxList == NULL) { - size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList); - pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx)); - } - STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; + STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + // set data block index overflow, in order to disable the offset comparator if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index af7d00e019..581696c94a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo { uint8_t* pData; } lastPk; - int32_t firstPKLen; - int32_t lastPKLen; int64_t minVer; int64_t maxVer; int64_t blockOffset; @@ -237,7 +235,6 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; -// int64_t lastKey; // STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo; @@ -338,6 +335,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, int32_t numOfTables); +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); @@ -347,6 +345,12 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); +void clearRowKey(SRowKey* pKey); + +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { SArray* pTombData; @@ -382,6 +386,7 @@ typedef struct SCacheRowsReader { SArray* pFuncTypeList; __compar_fn_t pkComparFn; SRowKey rowKey; + SColumnInfo pkColumn; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c2032554b6..8e6f637d81 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; int32_t readIdx; SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSHashObj* mTableNumRows; // uid->num of table rows - SHashObj* mSkipTables; - int64_t mergeLimit; + SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool needCountEmptyTable; - bool bGroupProcessed; // the group return data means processed - bool filesetDelimited; - bool bNewFilesetEvent; - bool bNextDurationBlockEvent; - int32_t numNextDurationBlocks; - SSDataBlock* nextDurationBlocks[2]; - bool rtnNextDurationBlocks; - int32_t nextDurationBlocksIdx; - - bool bSortRowId; + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; + bool bSortRowId; STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 56052434a4..23e873d335 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo { SExprSupp pseudoExprSup; int32_t retrieveType; int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; + SSDataBlock* pBufferedRes; SArray* pUidList; SArray* pCidList; int32_t indexOfBufferedRes; @@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); - pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); - setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); - blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); + pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); + blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output @@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { - blockDataCleanup(pInfo->pBufferredRes); + if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) { + blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, + int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } // check for tag values - int32_t resultRows = pInfo->pBufferredRes->info.rows; + int32_t resultRows = pInfo->pBufferedRes->info.rows; // the results may be null, if last values are all null ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); @@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; - if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { + if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) { SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; - SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { @@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); - blockDataDestroy(pInfo->pBufferredRes); + blockDataDestroy(pInfo->pBufferedRes); taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 06f63f5f04..39bbc1bc69 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -216,11 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { @@ -248,11 +243,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; goto _end; } - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 4f055cb928..be6fb2983c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -255,11 +255,13 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); + if (pItem->isPk) { SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); pBlockInfo->pks[0].type = pInfoData->info.type; pBlockInfo->pks[1].type = pInfoData->info.type; + // allocate enough buffer size, which is pInfoData->info.bytes if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); if (pBlockInfo->pks[0].pData == NULL) { @@ -271,7 +273,12 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) taosMemoryFreeClear(pBlockInfo->pks[0].pData); return TSDB_CODE_OUT_OF_MEMORY; } + + pBlockInfo->pks[0].nData = pInfoData->info.bytes; + pBlockInfo->pks[1].nData = pInfoData->info.bytes; } + + break; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bec5a73198..55eae7ae60 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); - pInfo->pResBlock->info.blankFill = false; if (!pInfo->needCountEmptyTable) { @@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t num = 0; - STableKeyInfo* pList = NULL; + int32_t num = 0; + STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { @@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { return NULL; } + taosRLockLatch(&pTaskInfo->lock); initNextGroupScan(pInfo, &pList, &num); + taosRUnLockLatch(&pTaskInfo->lock); + ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -4069,14 +4070,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pInfo->pReaderBlock; - int32_t code = 0; - bool hasNext = false; - STsdbReader* reader = pInfo->base.dataReader; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { @@ -4112,27 +4112,24 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe *pSkipped = true; return; } + return; } static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = NULL; - int32_t code = 0; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + int64_t st = taosGetTimestampUs(); - int64_t st = taosGetTimestampUs(); - bool hasNext = false; - - STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { - qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", - GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; @@ -4141,19 +4138,19 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { blockDataDestroy(pInfo->nextDurationBlocks[i]); pInfo->nextDurationBlocks[i] = NULL; } + pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; pInfo->numNextDurationBlocks = 0; continue; } } else { - bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", - GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -4164,15 +4161,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { - qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), + pInfo->numNextDurationBlocks); pInfo->bNewFilesetEvent = false; break; } } + if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; return NULL; } + if (pInfo->bNextDurationBlockEvent) { pInfo->bNextDurationBlockEvent = false; continue; @@ -4180,19 +4180,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (bSkipped) continue; } + pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + return pBlock; } return NULL; } - SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo biTs = {0}; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8fb8aaa69d..b72811cdcc 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1339,23 +1339,26 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + // start a new session window + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } + + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 44404c345e..cd1a858175 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); @@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + int64_t firstRowTs = *(int64_t*)tsCol->pData; + if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { blockDataDestroy(pBlk); - } + } continue; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 99adc8b3bb..f1e5255842 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first - // function. + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. #if 0 if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { @@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else + +// todo refactor if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { numOfElems = 1; @@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } - // SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index a1c257022a..f3192b4956 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z); } - if (pSchema->flags & COL_IS_KEY) { - return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z); - } pVal->flag = CV_FLAG_NULL; return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 59c5ce82ad..bdeb548bd7 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in pBind = bind + c; } - if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){ - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); if (code) { goto _return; @@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu pBind = bind; } - if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) { - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1); qDebug("stmt col %d bind %d rows data", colIdx, rowNum); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 650c2565e0..37a4ff0506 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8271,7 +8271,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) { code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, - "Primary key column of dest table can not be null"); + "Primary key column name must be defined in existed-stable field"); } SNodeList* pNewProjections = NULL; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ede4dc07e1..4b7ed59039 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - terrno = TSDB_CODE_SUCCESS; - raftStoreNextTerm(ths); - if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); - return -1; - } - if (syncNodeAssignedLeader2Leader(ths) != 0) { - sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId); - return -1; - } - - taosThreadMutexLock(&ths->arbTokenMutex); - syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken); - sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken); - taosThreadMutexUnlock(&ths->arbTokenMutex); + syncNodeStepDown(ths, pMsg->term); } } else { (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 85aa3a2796..fbdb5f4201 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sError("sync step down error"); - return -1; - } - - syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); - return 0; -} -#endif - bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // start in syncNodeStart // start raft - // syncNodeBecomeFollower(pSyncNode); int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; @@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // persist cfg syncWriteCfgFile(pSyncNode); - -#if 0 - // change isStandBy to normal (election timeout) - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, ""); - - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); - // syncMaybeAdvanceCommitIndex(pSyncNode); - - } else { - syncNodeBecomeFollower(pSyncNode, ""); - } -#endif } else { // persist cfg syncWriteCfgFile(pSyncNode); @@ -1874,18 +1845,6 @@ _END: } // raft state change -------------- -#ifdef BUILD_NO_CALL -void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > raftStoreGetTerm(pSyncNode)) { - raftStoreSetTerm(pSyncNode, term); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode); - } -} -#endif - void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); @@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); + if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { + taosThreadMutexLock(&pSyncNode->arbTokenMutex); + syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); + sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); + taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); + } + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode); - } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeBecomeFollower(pSyncNode, "step down"); @@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "follower to candidate"); } -#ifdef BUILD_NO_CALL -void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode, "leader to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "leader to follower"); -} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "candidate to follower"); -} -#endif - int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0812181c5c..8d80e3883d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,6 +629,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column should not be none") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner