Merge branch '3.0' into fix/TD-31163-3.0

This commit is contained in:
kailixu 2024-08-24 21:03:11 +08:00
commit c97f01a023
15 changed files with 253 additions and 96 deletions

View File

@ -45,9 +45,9 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries,
void tMergeTreeDestroy(SMultiwayMergeTreeInfo **pTree);
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
int32_t tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
int32_t tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);

View File

@ -973,6 +973,9 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
int32_t numOfRows = *(int32_t*)buf;
if (numOfRows == 0) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
if (code) {
return code;

View File

@ -137,7 +137,6 @@ static int32_t tGetPrimaryKeyIndex(uint8_t* p, SPrimaryKeyIndex* index) {
static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS);
uint8_t* data = pRow->data;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
@ -673,7 +672,10 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
break;
}
ASSERT(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid);
if (!(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid)) {
tsdbError("tsdb failed at: %s %d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
i += 1;
continue;
@ -753,7 +755,10 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
continue;
}
ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid);
if (!(pRecord->suid == pReader->info.suid && uid == pRecord->uid)) {
tsdbError("tsdb failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
STableBlockScanInfo* pScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
@ -924,21 +929,23 @@ static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInf
size_t num = TARRAY_SIZE(pBlockIter->blockList);
if (num == 0) {
ASSERT(pBlockIter->numOfBlocks == num);
return TSDB_CODE_FAILED;
tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
*pInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
return (*pInfo) != NULL? TSDB_CODE_SUCCESS:TSDB_CODE_FAILED;
}
static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) {
static int32_t doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) {
// start end position
int s, e;
s = pos;
// check
ASSERT(pos >= 0 && pos < num && num > 0);
if (!(pos >= 0 && pos < num && num > 0)) {
return -1;
}
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
@ -1242,7 +1249,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
}
pDumpInfo->rowIndex = findFirstPos(pBlockData->aTSKEY, pRecord->numRow, pDumpInfo->rowIndex, (!asc));
ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer);
if (!(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer)) {
tsdbError("tsdb failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INVALID_PARA;
}
// find the appropriate start position that satisfies the version requirement.
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) ||
@ -1382,7 +1392,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
}
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
ASSERT(pReader->info.pSchema == NULL);
if (pReader->info.pSchema != NULL) {
terrno = TSDB_CODE_INVALID_PARA;
tsdbError("tsdb invalid input param at: %s:%d", __func__, __LINE__);
return NULL;
}
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
terrno = code;
@ -1663,7 +1678,6 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, order);
// todo handle the primary key overlap case
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
@ -1921,7 +1935,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->info.pSchema == NULL);
if (pReader->info.pSchema != NULL) {
tsdbError("tsdb failed at %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
@ -2014,7 +2031,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->info.pSchema == NULL);
if (pReader->info.pSchema) {
tsdbError("tsdb failed at %s %d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
@ -2147,7 +2167,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->info.pSchema == NULL);
if (pReader->info.pSchema != NULL) {
tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
@ -2548,7 +2571,10 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->info.pSchema == NULL);
if (pReader->info.pSchema != NULL) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
@ -3267,8 +3293,6 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
// current active data block not overlap with the stt-files/stt-blocks
static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
return true;
} else {
@ -3336,7 +3360,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// data block, so the overlap check is invalid actually.
buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index);
} else { // clean stt block
ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL);
if (!(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL)) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
code = buildCleanBlockFromSttFiles(pReader, pScanInfo);
return code;
}
@ -3357,7 +3384,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// no data in stt block, no need to proceed.
while (hasDataInSttBlock(pScanInfo)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
if (pScanInfo->sttKeyInfo.status != STT_FILE_HAS_DATA) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
if (code != TSDB_CODE_SUCCESS) {
@ -3580,7 +3610,11 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
}
// all data blocks are checked in this stt file, now let's try the next file set
ASSERT(pReader->status.pTableIter == NULL);
if (pReader->status.pTableIter != NULL) {
terrno = TSDB_CODE_INTERNAL_ERROR;
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_READ_RETURN;
}
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
@ -3794,7 +3828,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
return false;
}
ASSERT(key >= last->ts);
// ASSERT(key >= last->ts);
if (key > last->ts) {
return false;
} else if (key == last->ts) {
@ -3857,7 +3891,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
} else if (key == pFirst->ts) {
return pFirst->version >= ver;
} else {
ASSERT(0);
// ASSERT(0);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);

View File

@ -741,7 +741,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
}
numOfTotal += 1;
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
for (int32_t i = 0; i < numOfTables; ++i) {
@ -843,7 +846,10 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
continue;
}
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
if (!(record.suid == pReader->info.suid && uid == record.uid)) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
@ -877,7 +883,10 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
break;
}
ASSERT(pTombBlk->minTbid.suid <= pReader->info.suid && pTombBlk->maxTbid.suid >= pReader->info.suid);
if (!(pTombBlk->minTbid.suid <= pReader->info.suid && pTombBlk->maxTbid.suid >= pReader->info.suid)) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
if (pTombBlk->maxTbid.suid == pReader->info.suid && pTombBlk->maxTbid.uid < pList->tableUidList[0]) {
i += 1;
continue;

View File

@ -240,7 +240,12 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
*order = info.order;
*scanFlag = info.scanFlag;
ASSERT(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC);
if (p.code == TSDB_CODE_SUCCESS) {
if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
qError("operator failed at: %s:%d", __func__, __LINE__);
p.code = TSDB_CODE_INVALID_PARA;
}
}
return p.code;
}

View File

@ -218,7 +218,10 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
// remainGroupOffset == 0
// here check for a new group data, we need to handle the data of the previous group.
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
qError("project failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INVALID_PARA;
}
bool newGroup = false;
if (0 == pBlock->info.id.groupId) {
@ -818,7 +821,10 @@ int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
}
int32_t startOffset = pRes->info.rows;
ASSERT(pRes->info.capacity > 0);
if (pRes->info.capacity <= 0) {
qError("project failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
if (code) {
return code;
@ -875,7 +881,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
qError("project failed at: %s:%d", __func__, __LINE__);
code = TSDB_CODE_INVALID_PARA;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pColInfoData == NULL) {
code = terrno;
@ -1019,7 +1029,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
if (pResult->info.capacity <= 0) {
qError("project failed at: %s:%d", __func__, __LINE__);
code = TSDB_CODE_INVALID_PARA;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
if (ret < 0) {
@ -1146,7 +1160,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
if (pResult->info.capacity <= 0) {
qError("project failed at: %s:%d", __func__, __LINE__);
code = TSDB_CODE_INVALID_PARA;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
if (ret < 0) {
code = ret;

View File

@ -5119,9 +5119,7 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
}
}
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
return TSDB_CODE_SUCCESS;
return tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
}
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {

View File

@ -295,6 +295,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
break;
}
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (p->info.rows > 0) {
code = blockDataEnsureCapacity(pDataBlock, capacity);

View File

@ -14,6 +14,7 @@
*/
#include "tlinearhash.h"
#include "query.h"
#include "taoserror.h"
#include "tdef.h"
#include "tpagedbuf.h"
@ -59,7 +60,11 @@ static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) { return ha
static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
int32_t v = bucketId - (1ul << (bits - 1));
ASSERT(v < numOfBuckets);
if (v >= numOfBuckets) {
qError("tlinearhash failed at: %s:%d", __func__, __LINE__);
terrno = TSDB_CODE_INTERNAL_ERROR;
return -1;
}
return v;
}
@ -85,11 +90,15 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
ASSERT(pPage != NULL);
if (pPage == NULL) {
return TSDB_CODE_INVALID_PARA;
}
// put to current buf page
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf));
if (nodeSize + sizeof(SFilePage) > getBufPageSize(pHashObj->pBuf)) {
return TSDB_CODE_INVALID_PARA;
}
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
releaseBufPage(pHashObj->pBuf, pPage);
@ -174,7 +183,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
setBufPageDirty(pFirst, true);
setBufPageDirty(pLast, true);
ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
// ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
pFirst->num += nodeSize;
pLast->num -= nodeSize;
@ -320,6 +329,9 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v = newBucketId;
}
if (v < 0) {
return terrno;
}
SLHashBucket* pBucket = pHashObj->pBucket[v];
code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
@ -343,7 +355,10 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
if (numOfBits > pHashObj->bits) {
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT(numOfBits == pHashObj->bits + 1);
if (numOfBits != pHashObj->bits + 1) {
qError("linear hash faield at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
pHashObj->bits = numOfBits;
}
@ -360,14 +375,20 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
char* pStart = p->data;
while (pStart - ((char*)p) < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
ASSERT(pNode->keyLen > 0);
if (pNode->keyLen <= 0) {
qError("linear hash faield at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
char* k = GET_LHASH_NODE_KEY(pNode);
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId);
if (v1 != newBucketId) {
qError("linear hash failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INTERNAL_ERROR;
}
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
code = doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
@ -395,6 +416,9 @@ char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) {
if (bucketId >= pHashObj->numOfBuckets) {
bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets);
}
if (bucketId < 0) {
return NULL;
}
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
int32_t num = taosArrayGetSize(pBucket->pPageIdList);

View File

@ -468,7 +468,11 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
int32_t numOfRows =
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
ASSERT(numOfRows > 0);
if (numOfRows <= 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}
@ -528,7 +532,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
if (size > getBufPageSize(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code = blockDataToBuf(pPage, p);
if (code) {
@ -733,7 +740,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
tMergeTreePrint(pTree);
#endif
tMergeTreeAdjust(pTree, leafNodeIndex);
int32_t code = tMergeTreeAdjust(pTree, leafNodeIndex);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
#ifdef _DEBUG_VIEW
printf("\nafter adjust:\t");
@ -1040,7 +1050,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t size =
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
if (size > getBufPageSize(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code= blockDataToBuf(pPage, pDataBlock);
if (code) {
@ -1310,7 +1323,10 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
}
pRegion->bufLen = readBytes;
}
ASSERT(pRegion->bufRegOffset <= tupleOffset);
if (pRegion->bufRegOffset > tupleOffset) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
*pFreeRow = false;
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
@ -1487,7 +1503,10 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions);
ASSERT(numRegions == (pMemFile->currRegionId + 1));
if (numRegions != (pMemFile->currRegionId + 1)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (numRegions == 0) {
return TSDB_CODE_SUCCESS;
}
@ -1812,7 +1831,10 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
}
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
if (size > getBufPageSize(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
int32_t code = blockDataToBuf(pPage, blk);
@ -1863,12 +1885,18 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
if (pPkCol == NULL) { // no var column
ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol));
if (!((numOfCols == 4) && (!pDstBlock->info.hasVarCol))) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
size += blockDataGetRowSize(pDstBlock);
} else {
ASSERT(numOfCols == 5);
if (numOfCols != 5) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
for(int32_t i = 0; i < numOfCols - 1; ++i) {
@ -2018,7 +2046,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
if (aPgId == NULL) {
return terrno;
goto _error;
}
int32_t nRows = 0;
@ -2039,10 +2067,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
cleanupMergeSup(&sup);
return code;
goto _error;
}
nMergedRows += pHandle->pDataBlock->info.rows;
@ -2064,7 +2089,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
code = blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
if (code) {
return code;
goto _error;
}
if (pHandle->bSortByRowId) {
@ -2074,11 +2099,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
}
if (code) {
return code;
goto _error;
}
blkPgSz += bufInc;
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize);
if (blkPgSz != blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize) {
qError("sort failed at: %s:%d", __func__, __LINE__);
goto _error;
}
++nRows;
@ -2087,7 +2115,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
} else {
++sup.aRowIdx[minIdx];
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
}
if (pHandle->pDataBlock->info.rows > 0) {
@ -2096,10 +2127,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aPgId);
taosMemoryFree(pTree);
cleanupMergeSup(&sup);
return code;
goto _error;
}
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
@ -2115,11 +2143,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SSDataBlock* pMemSrcBlk = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
if (code) {
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return code;
}
if (code) goto _error;
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
@ -2127,6 +2151,12 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
tMergeTreeDestroy(&pTree);
return code;
_error:
tMergeTreeDestroy(&pTree);
cleanupMergeSup(&sup);
if (aPgId) taosArrayDestroy(aPgId);
return code;
}
static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk,
@ -2502,7 +2532,10 @@ static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (pHandle->pBuf != NULL) {
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
if (numOfSources > getNumOfInMemBufPages(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}
if (numOfSources == 0) {
@ -2596,7 +2629,10 @@ static int32_t tsortBufMergeSortNextTuple(SSortHandle* pHandle, STupleHandle** p
index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
pSource = pHandle->cmpParam.pSources[index];
ASSERT(pSource->src.pBlock != NULL);
if (pSource->src.pBlock == NULL) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
pHandle->tupleHandle.pBlock = pSource->src.pBlock;

View File

@ -4014,7 +4014,6 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
if (TSDB_CODE_SUCCESS == code)
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists);
if (TSDB_CODE_SUCCESS == code) {
ASSERT(exists);
if (!pRealTable->tsmaTargetTbVgInfo) {
pRealTable->tsmaTargetTbVgInfo = taosArrayInit(pRealTable->pTsmas->size, POINTER_BYTES);
if (!pRealTable->tsmaTargetTbVgInfo) {
@ -7320,7 +7319,10 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
SValueNode* pFreq = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 0);
SValueNode* pKeep = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 1);
ASSERTS(IS_DURATION_VAL(pFreq->flag) && IS_DURATION_VAL(pKeep->flag), "Retentions freq/keep should have unit");
if (!IS_DURATION_VAL(pFreq->flag) || !IS_DURATION_VAL(pKeep->flag)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Retentions freq/keep should have unit");
}
// check unit
if (IS_DURATION_VAL(pFreq->flag) && TIME_UNIT_SECOND != pFreq->unit && TIME_UNIT_MINUTE != pFreq->unit &&
@ -9427,7 +9429,9 @@ static int32_t buildCreateTagIndexReq(STranslateContext* pCxt, SCreateIndexStmt*
(void)tNameGetFullDbName(&name, pReq->dbFName);
SNode* pNode = NULL;
ASSERT(LIST_LENGTH(pStmt->pCols) == 1);
if (LIST_LENGTH(pStmt->pCols) != 1) {
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
FOREACH(pNode, pStmt->pCols) {
SColumnNode* p = (SColumnNode*)pNode;
memcpy(pReq->colName, p->colName, sizeof(p->colName));

View File

@ -1281,8 +1281,10 @@ int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, ST
}
STableTSMAInfoRsp* pTsmaRsp = NULL;
code = getMetaDataFromHash(tsmaFName, strlen(tsmaFName), pMetaCache->pTSMAs, (void**)&pTsmaRsp);
if (TSDB_CODE_SUCCESS == code && pTsmaRsp) {
ASSERT(pTsmaRsp->pTsmas->size == 1);
if (TSDB_CODE_SUCCESS == code) {
if (!pTsmaRsp || pTsmaRsp->pTsmas->size != 1) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0);
} else if (code == TSDB_CODE_PAR_INTERNAL_ERROR){
code = TSDB_CODE_MND_SMA_NOT_EXIST;

View File

@ -6493,6 +6493,7 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
pSort->calcGroupId = true;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pNode);
pCxt->optimized = true;
} else {
nodesDestroyNode((SNode*)pSort);
@ -6641,7 +6642,6 @@ static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
pTsmaOptCtx->pAggFuncs = pWindow->pFuncs;
pTsmaOptCtx->ppParentTsmaSubplans = &pWindow->pTsmaSubplans;
} else {
ASSERT(nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_AGG);
SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent;
pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs;
pTsmaOptCtx->ppParentTsmaSubplans = &pAgg->pTsmaSubplans;
@ -6939,8 +6939,9 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow*
}
int32_t tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNodeList* pAggFuncs, SNodeList** ppList) {
ASSERT(pTsma->pTsma);
ASSERT(pTsma->pTsmaScanCols);
if (!pTsma->pTsma || !pTsma->pTsmaScanCols) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
int32_t code;
SNode* pNode;
SNodeList* pScanCols = NULL;
@ -6996,8 +6997,7 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
break;
}
}
ASSERT(found);
return 0;
return found ? TSDB_CODE_SUCCESS : TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode,
@ -7116,7 +7116,6 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
SColumnNode* pPkTsCol = NULL;
FOREACH(pNode, pNewScan->pScanCols) {
SColumnNode* pCol = (SColumnNode*)pNode;
ASSERT(pTsma->pTsmaScanCols);
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pPkTsCol = NULL;
code = nodesCloneNode((SNode*)pCol, (SNode**)&pPkTsCol);
@ -7302,7 +7301,6 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
for (int32_t j = 0; j < pTsmaOptCtx->pScan->pTsmas->size; ++j) {
if (taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, j) == pTsma->pTsma) {
const STsmaTargetTbInfo* ptbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetTbInfo, j);
ASSERT(ptbInfo->uid != 0);
strcpy(pTsma->targetTbName, ptbInfo->tableName);
pTsma->targetTbUid = ptbInfo->uid;
}

View File

@ -19,8 +19,11 @@
#include "tlog.h"
// Set the initial value of the multiway merge tree.
static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
ASSERT((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources));
static int32_t tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
if (!((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources))) {
uError("losertree failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INVALID_PARA;
}
for (int32_t i = 0; i < pTree->totalSources; ++i) {
if (i < pTree->numOfSources) {
@ -29,6 +32,7 @@ static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
pTree->pNode[i].index = i - pTree->numOfSources;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, void* param,
@ -50,7 +54,11 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
pTreeInfo->comparFn = compareFn;
// set initial value for loser tree
tMergeTreeInit(pTreeInfo);
int32_t code = tMergeTreeInit(pTreeInfo);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pTreeInfo);
return code;
}
#ifdef _DEBUG_VIEW
printf("the initial value of loser tree:\n");
@ -58,7 +66,11 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
#endif
for (int32_t i = totalEntries - 1; i >= numOfSources; i--) {
tMergeTreeAdjust(pTreeInfo, i);
code = tMergeTreeAdjust(pTreeInfo, i);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pTreeInfo);
return code;
}
}
#if defined(_DEBUG_VIEW)
@ -79,13 +91,17 @@ void tMergeTreeDestroy(SMultiwayMergeTreeInfo** pTree) {
taosMemoryFreeClear(*pTree);
}
void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
ASSERT(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2);
int32_t tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
int32_t code = 0;
if (!(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2)) {
uError("losertree failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_INVALID_PARA;
}
if (pTree->totalSources == 2) {
pTree->pNode[0].index = 0;
pTree->pNode[1].index = 0;
return;
return code;
}
int32_t parentId = idx >> 1;
@ -95,7 +111,7 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
STreeNode* pCur = &pTree->pNode[parentId];
if (pCur->index == -1) {
pTree->pNode[parentId] = kLeaf;
return;
return code;
}
int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
@ -112,13 +128,21 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
// winner cannot be identical to the loser, which is pTreeNode[1]
pTree->pNode[0] = kLeaf;
}
return code;
}
void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
tMergeTreeInit(pTree);
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
tMergeTreeAdjust(pTree, i);
int32_t tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
int32_t code = tMergeTreeInit(pTree);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
code = tMergeTreeAdjust(pTree, i);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
/*

View File

@ -313,7 +313,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
static char* evictBufPage(SDiskbasedBuf* pBuf) {
SListNode* pn = getEldestUnrefedPage(pBuf);
if (pn == NULL) { // no available buffer pages now, return.
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}