remove asserts of nodes/planner/parser/tsdbread
This commit is contained in:
parent
c2585cef6b
commit
e6a35dd34d
|
@ -45,9 +45,9 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries,
|
|||
|
||||
void tMergeTreeDestroy(SMultiwayMergeTreeInfo **pTree);
|
||||
|
||||
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
|
||||
int32_t tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
|
||||
|
||||
void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
|
||||
int32_t tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
|
||||
|
||||
void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);
|
||||
|
||||
|
|
|
@ -137,7 +137,6 @@ static int32_t tGetPrimaryKeyIndex(uint8_t* p, SPrimaryKeyIndex* index) {
|
|||
|
||||
static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
|
||||
ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS);
|
||||
|
||||
uint8_t* data = pRow->data;
|
||||
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
|
||||
|
@ -673,7 +672,10 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
|||
break;
|
||||
}
|
||||
|
||||
ASSERT(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid);
|
||||
if (!(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid)) {
|
||||
tsdbError("tsdb failed at: %s %d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
|
||||
i += 1;
|
||||
continue;
|
||||
|
@ -753,7 +755,10 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
|
|||
continue;
|
||||
}
|
||||
|
||||
ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid);
|
||||
if (!(pRecord->suid == pReader->info.suid && uid == pRecord->uid)) {
|
||||
tsdbError("tsdb failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
|
||||
|
@ -924,7 +929,7 @@ static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInf
|
|||
|
||||
size_t num = TARRAY_SIZE(pBlockIter->blockList);
|
||||
if (num == 0) {
|
||||
ASSERT(pBlockIter->numOfBlocks == num);
|
||||
tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -932,13 +937,15 @@ static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInf
|
|||
return (*pInfo) != NULL? TSDB_CODE_SUCCESS:TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) {
|
||||
static int32_t doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) {
|
||||
// start end position
|
||||
int s, e;
|
||||
s = pos;
|
||||
|
||||
// check
|
||||
ASSERT(pos >= 0 && pos < num && num > 0);
|
||||
if (!(pos >= 0 && pos < num && num > 0)) {
|
||||
return -1;
|
||||
}
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
// find the first position which is smaller than the key
|
||||
e = num - 1;
|
||||
|
@ -1242,7 +1249,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
}
|
||||
|
||||
pDumpInfo->rowIndex = findFirstPos(pBlockData->aTSKEY, pRecord->numRow, pDumpInfo->rowIndex, (!asc));
|
||||
ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer);
|
||||
if (!(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer)) {
|
||||
tsdbError("tsdb failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// find the appropriate start position that satisfies the version requirement.
|
||||
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) ||
|
||||
|
@ -1382,7 +1392,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
|||
}
|
||||
|
||||
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
|
||||
ASSERT(pReader->info.pSchema == NULL);
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
tsdbError("tsdb invalid input param at: %s:%d", __func__, __LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
|
||||
terrno = code;
|
||||
|
@ -1663,7 +1678,6 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, order);
|
||||
|
||||
// todo handle the primary key overlap case
|
||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
|
||||
|
@ -1921,7 +1935,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||
if (pMerger->pArray == NULL) {
|
||||
ASSERT(pReader->info.pSchema == NULL);
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
tsdbError("tsdb failed at %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
|
||||
if (ps == NULL) {
|
||||
return terrno;
|
||||
|
@ -2014,7 +2031,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
|
||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||
if (pMerger->pArray == NULL) {
|
||||
ASSERT(pReader->info.pSchema == NULL);
|
||||
if (pReader->info.pSchema) {
|
||||
tsdbError("tsdb failed at %s %d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
|
||||
if (ps == NULL) {
|
||||
return terrno;
|
||||
|
@ -2147,7 +2167,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||
if (pMerger->pArray == NULL) {
|
||||
ASSERT(pReader->info.pSchema == NULL);
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
|
||||
if (ps == NULL) {
|
||||
return terrno;
|
||||
|
@ -2548,7 +2571,10 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
|
||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||
if (pMerger->pArray == NULL) {
|
||||
ASSERT(pReader->info.pSchema == NULL);
|
||||
if (pReader->info.pSchema != NULL) {
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
|
||||
if (ps == NULL) {
|
||||
return terrno;
|
||||
|
@ -3267,8 +3293,6 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
|
||||
// current active data block not overlap with the stt-files/stt-blocks
|
||||
static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
|
||||
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
|
||||
return true;
|
||||
} else {
|
||||
|
@ -3336,7 +3360,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
// data block, so the overlap check is invalid actually.
|
||||
buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index);
|
||||
} else { // clean stt block
|
||||
ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL);
|
||||
if (!(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL)) {
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
code = buildCleanBlockFromSttFiles(pReader, pScanInfo);
|
||||
return code;
|
||||
}
|
||||
|
@ -3357,7 +3384,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// no data in stt block, no need to proceed.
|
||||
while (hasDataInSttBlock(pScanInfo)) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||
if (pScanInfo->sttKeyInfo.status != STT_FILE_HAS_DATA) {
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3580,7 +3610,11 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// all data blocks are checked in this stt file, now let's try the next file set
|
||||
ASSERT(pReader->status.pTableIter == NULL);
|
||||
if (pReader->status.pTableIter != NULL) {
|
||||
terrno = TSDB_CODE_FAILED;
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_READ_RETURN;
|
||||
}
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
||||
// error happens or all the data files are completely checked
|
||||
|
@ -3794,7 +3828,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
return false;
|
||||
}
|
||||
|
||||
ASSERT(key >= last->ts);
|
||||
// ASSERT(key >= last->ts);
|
||||
if (key > last->ts) {
|
||||
return false;
|
||||
} else if (key == last->ts) {
|
||||
|
@ -3857,7 +3891,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
} else if (key == pFirst->ts) {
|
||||
return pFirst->version >= ver;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
// ASSERT(0);
|
||||
}
|
||||
} else {
|
||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||
|
|
|
@ -741,7 +741,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
}
|
||||
|
||||
numOfTotal += 1;
|
||||
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||
code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
|
@ -843,7 +846,10 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
|
|||
continue;
|
||||
}
|
||||
|
||||
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
|
||||
if (!(record.suid == pReader->info.suid && uid == record.uid)) {
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (record.version <= pReader->info.verRange.maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
|
@ -877,7 +883,10 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
|
|||
break;
|
||||
}
|
||||
|
||||
ASSERT(pTombBlk->minTbid.suid <= pReader->info.suid && pTombBlk->maxTbid.suid >= pReader->info.suid);
|
||||
if (!(pTombBlk->minTbid.suid <= pReader->info.suid && pTombBlk->maxTbid.suid >= pReader->info.suid)) {
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (pTombBlk->maxTbid.suid == pReader->info.suid && pTombBlk->maxTbid.uid < pList->tableUidList[0]) {
|
||||
i += 1;
|
||||
continue;
|
||||
|
|
|
@ -240,7 +240,12 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
*order = info.order;
|
||||
*scanFlag = info.scanFlag;
|
||||
|
||||
ASSERT(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC);
|
||||
if (p.code == TSDB_CODE_SUCCESS) {
|
||||
if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
|
||||
qError("operator failed at: %s:%d", __func__, __LINE__);
|
||||
p.code = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
}
|
||||
return p.code;
|
||||
}
|
||||
|
||||
|
|
|
@ -218,7 +218,10 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo
|
|||
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
|
||||
// remainGroupOffset == 0
|
||||
// here check for a new group data, we need to handle the data of the previous group.
|
||||
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
|
||||
if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
|
||||
qError("project failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
bool newGroup = false;
|
||||
if (0 == pBlock->info.id.groupId) {
|
||||
|
@ -818,7 +821,10 @@ int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t startOffset = pRes->info.rows;
|
||||
ASSERT(pRes->info.capacity > 0);
|
||||
if (pRes->info.capacity <= 0) {
|
||||
qError("project failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -875,7 +881,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
|
||||
ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
|
||||
if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
|
||||
qError("project failed at: %s:%d", __func__, __LINE__);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
if (pColInfoData == NULL) {
|
||||
code = terrno;
|
||||
|
@ -1019,7 +1029,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
if (pResult->info.capacity <= 0) {
|
||||
qError("project failed at: %s:%d", __func__, __LINE__);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
|
||||
if (ret < 0) {
|
||||
|
@ -1146,7 +1160,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
if (pResult->info.capacity <= 0) {
|
||||
qError("project failed at: %s:%d", __func__, __LINE__);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
|
||||
if (ret < 0) {
|
||||
code = ret;
|
||||
|
|
|
@ -5118,9 +5118,7 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
|||
}
|
||||
}
|
||||
|
||||
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
|
||||
}
|
||||
|
||||
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tlinearhash.h"
|
||||
#include "query.h"
|
||||
#include "taoserror.h"
|
||||
#include "tdef.h"
|
||||
#include "tpagedbuf.h"
|
||||
|
@ -59,7 +60,11 @@ static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) { return ha
|
|||
|
||||
static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
|
||||
int32_t v = bucketId - (1ul << (bits - 1));
|
||||
ASSERT(v < numOfBuckets);
|
||||
if (v >= numOfBuckets) {
|
||||
qError("tlinearhash failed at: %s:%d", __func__, __LINE__);
|
||||
terrno = TSDB_CODE_FAILED;
|
||||
return -1;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
|
@ -85,11 +90,15 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
|
|||
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
|
||||
|
||||
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
|
||||
ASSERT(pPage != NULL);
|
||||
if (pPage == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// put to current buf page
|
||||
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
|
||||
ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf));
|
||||
if (nodeSize + sizeof(SFilePage) > getBufPageSize(pHashObj->pBuf)) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
|
||||
releaseBufPage(pHashObj->pBuf, pPage);
|
||||
|
@ -174,7 +183,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
|||
setBufPageDirty(pFirst, true);
|
||||
setBufPageDirty(pLast, true);
|
||||
|
||||
ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
|
||||
// ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
|
||||
|
||||
pFirst->num += nodeSize;
|
||||
pLast->num -= nodeSize;
|
||||
|
@ -320,6 +329,9 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
|
|||
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
|
||||
v = newBucketId;
|
||||
}
|
||||
if (v < 0) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SLHashBucket* pBucket = pHashObj->pBucket[v];
|
||||
code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
|
||||
|
@ -343,7 +355,10 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
|
|||
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
|
||||
if (numOfBits > pHashObj->bits) {
|
||||
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
|
||||
ASSERT(numOfBits == pHashObj->bits + 1);
|
||||
if (numOfBits != pHashObj->bits + 1) {
|
||||
qError("linear hash faield at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pHashObj->bits = numOfBits;
|
||||
}
|
||||
|
||||
|
@ -360,14 +375,20 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
|
|||
char* pStart = p->data;
|
||||
while (pStart - ((char*)p) < p->num) {
|
||||
SLHashNode* pNode = (SLHashNode*)pStart;
|
||||
ASSERT(pNode->keyLen > 0);
|
||||
if (pNode->keyLen <= 0) {
|
||||
qError("linear hash faield at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char* k = GET_LHASH_NODE_KEY(pNode);
|
||||
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
|
||||
int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
|
||||
|
||||
if (v1 != splitBucketId) { // place it into the new bucket
|
||||
ASSERT(v1 == newBucketId);
|
||||
if (v1 != newBucketId) {
|
||||
qError("linear hash failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
|
||||
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
|
||||
code = doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
|
||||
|
@ -395,6 +416,9 @@ char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) {
|
|||
if (bucketId >= pHashObj->numOfBuckets) {
|
||||
bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets);
|
||||
}
|
||||
if (bucketId < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
|
||||
int32_t num = taosArrayGetSize(pBucket->pPageIdList);
|
||||
|
|
|
@ -468,7 +468,11 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
|
|||
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
|
||||
int32_t numOfRows =
|
||||
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
|
||||
ASSERT(numOfRows > 0);
|
||||
if (numOfRows <= 0) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
||||
}
|
||||
|
@ -528,7 +532,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
|
||||
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||
if (size > getBufPageSize(pHandle->pBuf)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
code = blockDataToBuf(pPage, p);
|
||||
if (code) {
|
||||
|
@ -733,7 +740,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
|||
tMergeTreePrint(pTree);
|
||||
#endif
|
||||
|
||||
tMergeTreeAdjust(pTree, leafNodeIndex);
|
||||
int32_t code = tMergeTreeAdjust(pTree, leafNodeIndex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef _DEBUG_VIEW
|
||||
printf("\nafter adjust:\t");
|
||||
|
@ -1040,7 +1050,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
|
||||
int32_t size =
|
||||
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
|
||||
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||
if (size > getBufPageSize(pHandle->pBuf)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
code= blockDataToBuf(pPage, pDataBlock);
|
||||
if (code) {
|
||||
|
@ -1310,7 +1323,10 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
|
|||
}
|
||||
pRegion->bufLen = readBytes;
|
||||
}
|
||||
ASSERT(pRegion->bufRegOffset <= tupleOffset);
|
||||
if (pRegion->bufRegOffset > tupleOffset) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
|
||||
*pFreeRow = false;
|
||||
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
|
||||
|
@ -1487,7 +1503,10 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
|
|||
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
|
||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||
size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions);
|
||||
ASSERT(numRegions == (pMemFile->currRegionId + 1));
|
||||
if (numRegions != (pMemFile->currRegionId + 1)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (numRegions == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1812,7 +1831,10 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
|
|||
}
|
||||
|
||||
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
|
||||
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||
if (size > getBufPageSize(pHandle->pBuf)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t code = blockDataToBuf(pPage, blk);
|
||||
|
||||
|
@ -1863,12 +1885,18 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI
|
|||
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
|
||||
|
||||
if (pPkCol == NULL) { // no var column
|
||||
ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol));
|
||||
if (!((numOfCols == 4) && (!pDstBlock->info.hasVarCol))) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
|
||||
size += blockDataGetRowSize(pDstBlock);
|
||||
} else {
|
||||
ASSERT(numOfCols == 5);
|
||||
if (numOfCols != 5) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
|
||||
for(int32_t i = 0; i < numOfCols - 1; ++i) {
|
||||
|
@ -2018,7 +2046,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
|
||||
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
||||
if (aPgId == NULL) {
|
||||
return terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t nRows = 0;
|
||||
|
@ -2039,10 +2067,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pTree);
|
||||
taosArrayDestroy(aPgId);
|
||||
cleanupMergeSup(&sup);
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||
|
@ -2064,7 +2089,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
|
||||
code = blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||
if (code) {
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pHandle->bSortByRowId) {
|
||||
|
@ -2074,11 +2099,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
}
|
||||
|
||||
if (code) {
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
blkPgSz += bufInc;
|
||||
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize);
|
||||
if (blkPgSz != blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
++nRows;
|
||||
|
||||
|
@ -2087,7 +2115,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
} else {
|
||||
++sup.aRowIdx[minIdx];
|
||||
}
|
||||
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||
code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
if (pHandle->pDataBlock->info.rows > 0) {
|
||||
|
@ -2096,10 +2127,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(aPgId);
|
||||
taosMemoryFree(pTree);
|
||||
cleanupMergeSup(&sup);
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||
|
@ -2115,11 +2143,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
|
||||
SSDataBlock* pMemSrcBlk = NULL;
|
||||
code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
|
||||
if (code) {
|
||||
cleanupMergeSup(&sup);
|
||||
tMergeTreeDestroy(&pTree);
|
||||
return code;
|
||||
}
|
||||
if (code) goto _error;
|
||||
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
|
||||
|
||||
|
@ -2127,6 +2151,12 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
tMergeTreeDestroy(&pTree);
|
||||
|
||||
return code;
|
||||
|
||||
_error:
|
||||
tMergeTreeDestroy(&pTree);
|
||||
cleanupMergeSup(&sup);
|
||||
if (aPgId) taosArrayDestroy(aPgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk,
|
||||
|
@ -2502,7 +2532,10 @@ static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
|
|||
|
||||
int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
if (pHandle->pBuf != NULL) {
|
||||
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
|
||||
if (numOfSources > getNumOfInMemBufPages(pHandle->pBuf)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfSources == 0) {
|
||||
|
@ -2596,7 +2629,10 @@ static int32_t tsortBufMergeSortNextTuple(SSortHandle* pHandle, STupleHandle** p
|
|||
index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
|
||||
pSource = pHandle->cmpParam.pSources[index];
|
||||
|
||||
ASSERT(pSource->src.pBlock != NULL);
|
||||
if (pSource->src.pBlock == NULL) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
|
||||
pHandle->tupleHandle.pBlock = pSource->src.pBlock;
|
||||
|
|
|
@ -4014,7 +4014,6 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
|
|||
if (TSDB_CODE_SUCCESS == code)
|
||||
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
ASSERT(exists);
|
||||
if (!pRealTable->tsmaTargetTbVgInfo) {
|
||||
pRealTable->tsmaTargetTbVgInfo = taosArrayInit(pRealTable->pTsmas->size, POINTER_BYTES);
|
||||
if (!pRealTable->tsmaTargetTbVgInfo) {
|
||||
|
@ -7320,7 +7319,10 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
|
|||
SValueNode* pFreq = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 0);
|
||||
SValueNode* pKeep = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 1);
|
||||
|
||||
ASSERTS(IS_DURATION_VAL(pFreq->flag) && IS_DURATION_VAL(pKeep->flag), "Retentions freq/keep should have unit");
|
||||
if (!IS_DURATION_VAL(pFreq->flag) || !IS_DURATION_VAL(pKeep->flag)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||
"Retentions freq/keep should have unit");
|
||||
}
|
||||
|
||||
// check unit
|
||||
if (IS_DURATION_VAL(pFreq->flag) && TIME_UNIT_SECOND != pFreq->unit && TIME_UNIT_MINUTE != pFreq->unit &&
|
||||
|
@ -9427,7 +9429,9 @@ static int32_t buildCreateTagIndexReq(STranslateContext* pCxt, SCreateIndexStmt*
|
|||
(void)tNameGetFullDbName(&name, pReq->dbFName);
|
||||
|
||||
SNode* pNode = NULL;
|
||||
ASSERT(LIST_LENGTH(pStmt->pCols) == 1);
|
||||
if (LIST_LENGTH(pStmt->pCols) != 1) {
|
||||
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
|
||||
}
|
||||
FOREACH(pNode, pStmt->pCols) {
|
||||
SColumnNode* p = (SColumnNode*)pNode;
|
||||
memcpy(pReq->colName, p->colName, sizeof(p->colName));
|
||||
|
|
|
@ -1281,8 +1281,10 @@ int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, ST
|
|||
}
|
||||
STableTSMAInfoRsp* pTsmaRsp = NULL;
|
||||
code = getMetaDataFromHash(tsmaFName, strlen(tsmaFName), pMetaCache->pTSMAs, (void**)&pTsmaRsp);
|
||||
if (TSDB_CODE_SUCCESS == code && pTsmaRsp) {
|
||||
ASSERT(pTsmaRsp->pTsmas->size == 1);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (!pTsmaRsp || pTsmaRsp->pTsmas->size != 1) {
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
*pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0);
|
||||
} else if (code == TSDB_CODE_PAR_INTERNAL_ERROR){
|
||||
code = TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||
|
|
|
@ -6641,7 +6641,6 @@ static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
|
|||
pTsmaOptCtx->pAggFuncs = pWindow->pFuncs;
|
||||
pTsmaOptCtx->ppParentTsmaSubplans = &pWindow->pTsmaSubplans;
|
||||
} else {
|
||||
ASSERT(nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_AGG);
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent;
|
||||
pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs;
|
||||
pTsmaOptCtx->ppParentTsmaSubplans = &pAgg->pTsmaSubplans;
|
||||
|
@ -6939,8 +6938,9 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow*
|
|||
}
|
||||
|
||||
int32_t tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNodeList* pAggFuncs, SNodeList** ppList) {
|
||||
ASSERT(pTsma->pTsma);
|
||||
ASSERT(pTsma->pTsmaScanCols);
|
||||
if (!pTsma->pTsma || !pTsma->pTsmaScanCols) {
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
int32_t code;
|
||||
SNode* pNode;
|
||||
SNodeList* pScanCols = NULL;
|
||||
|
@ -6996,8 +6996,7 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
|
|||
break;
|
||||
}
|
||||
}
|
||||
ASSERT(found);
|
||||
return 0;
|
||||
return found ? TSDB_CODE_SUCCESS : TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode,
|
||||
|
@ -7116,7 +7115,6 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
|||
SColumnNode* pPkTsCol = NULL;
|
||||
FOREACH(pNode, pNewScan->pScanCols) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
ASSERT(pTsma->pTsmaScanCols);
|
||||
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
pPkTsCol = NULL;
|
||||
code = nodesCloneNode((SNode*)pCol, (SNode**)&pPkTsCol);
|
||||
|
@ -7302,7 +7300,6 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
|
|||
for (int32_t j = 0; j < pTsmaOptCtx->pScan->pTsmas->size; ++j) {
|
||||
if (taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, j) == pTsma->pTsma) {
|
||||
const STsmaTargetTbInfo* ptbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetTbInfo, j);
|
||||
ASSERT(ptbInfo->uid != 0);
|
||||
strcpy(pTsma->targetTbName, ptbInfo->tableName);
|
||||
pTsma->targetTbUid = ptbInfo->uid;
|
||||
}
|
||||
|
|
|
@ -19,8 +19,11 @@
|
|||
#include "tlog.h"
|
||||
|
||||
// Set the initial value of the multiway merge tree.
|
||||
static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
|
||||
ASSERT((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources));
|
||||
static int32_t tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
|
||||
if (!((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources))) {
|
||||
uError("losertree failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pTree->totalSources; ++i) {
|
||||
if (i < pTree->numOfSources) {
|
||||
|
@ -29,6 +32,7 @@ static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
|
|||
pTree->pNode[i].index = i - pTree->numOfSources;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, void* param,
|
||||
|
@ -50,7 +54,11 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
|
|||
pTreeInfo->comparFn = compareFn;
|
||||
|
||||
// set initial value for loser tree
|
||||
tMergeTreeInit(pTreeInfo);
|
||||
int32_t code = tMergeTreeInit(pTreeInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pTreeInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef _DEBUG_VIEW
|
||||
printf("the initial value of loser tree:\n");
|
||||
|
@ -58,7 +66,11 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
|
|||
#endif
|
||||
|
||||
for (int32_t i = totalEntries - 1; i >= numOfSources; i--) {
|
||||
tMergeTreeAdjust(pTreeInfo, i);
|
||||
code = tMergeTreeAdjust(pTreeInfo, i);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pTreeInfo);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
#if defined(_DEBUG_VIEW)
|
||||
|
@ -79,13 +91,17 @@ void tMergeTreeDestroy(SMultiwayMergeTreeInfo** pTree) {
|
|||
taosMemoryFreeClear(*pTree);
|
||||
}
|
||||
|
||||
void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
|
||||
ASSERT(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2);
|
||||
int32_t tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
|
||||
int32_t code = 0;
|
||||
if (!(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2)) {
|
||||
uError("losertree failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pTree->totalSources == 2) {
|
||||
pTree->pNode[0].index = 0;
|
||||
pTree->pNode[1].index = 0;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t parentId = idx >> 1;
|
||||
|
@ -95,7 +111,7 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
|
|||
STreeNode* pCur = &pTree->pNode[parentId];
|
||||
if (pCur->index == -1) {
|
||||
pTree->pNode[parentId] = kLeaf;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
|
||||
|
@ -112,13 +128,21 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
|
|||
// winner cannot be identical to the loser, which is pTreeNode[1]
|
||||
pTree->pNode[0] = kLeaf;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
|
||||
tMergeTreeInit(pTree);
|
||||
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
|
||||
tMergeTreeAdjust(pTree, i);
|
||||
int32_t tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
|
||||
int32_t code = tMergeTreeInit(pTree);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
|
||||
code = tMergeTreeAdjust(pTree, i);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -313,7 +313,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
|
|||
static char* evictBufPage(SDiskbasedBuf* pBuf) {
|
||||
SListNode* pn = getEldestUnrefedPage(pBuf);
|
||||
if (pn == NULL) { // no available buffer pages now, return.
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue