Merge pull request #19172 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Shengliang Guan 2022-12-27 16:07:23 +08:00 committed by GitHub
commit c844922aef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 124 additions and 238 deletions

View File

@ -57,7 +57,6 @@ struct SMetaCache {
TdThreadMutex lock; TdThreadMutex lock;
SHashObj* pTableEntry; SHashObj* pTableEntry;
SLRUCache* pUidResCache; SLRUCache* pUidResCache;
uint64_t keyBuf[3];
} sTagFilterResCache; } sTagFilterResCache;
}; };
@ -429,20 +428,20 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
bool* acquireRes) { bool* acquireRes) {
// generate the composed key for LRU cache // generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry; SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock; TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t buf[3] = {0};
uint32_t times = 0; uint32_t times = 0;
*acquireRes = 0; *acquireRes = 0;
pBuf[0] = suid; buf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen); memcpy(&buf[1], pKey, keyLen);
taosThreadMutexLock(pLock); taosThreadMutexLock(pLock);
int32_t len = keyLen + sizeof(uint64_t); int32_t len = keyLen + sizeof(uint64_t);
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); LRUHandle* pHandle = taosLRUCacheLookup(pCache, buf, len);
if (pHandle == NULL) { if (pHandle == NULL) {
taosThreadMutexUnlock(pLock); taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -476,10 +475,10 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
SListNode* pNode = NULL; SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) { while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&pBuf[1], pNode->data, keyLen); memcpy(&buf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not. // check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
if (pRes == NULL) { // remove the item in the linked list if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode); taosArrayPush(pInvalidRes, &pNode);
} else { } else {
@ -547,14 +546,14 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
tdListAppend(&(*pEntry)->list, pKey); tdListAppend(&(*pEntry)->list, pKey);
} }
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; uint64_t buf[3] = {0};
pBuf[0] = suid; buf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen); memcpy(&buf[1], pKey, keyLen);
ASSERT(sizeof(uint64_t) + keyLen == 24); ASSERT(sizeof(uint64_t) + keyLen == 24);
// add to cache. // add to cache.
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW); TAOS_LRU_PRIORITY_LOW);
taosThreadMutexUnlock(pLock); taosThreadMutexUnlock(pLock);

View File

@ -567,7 +567,6 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
pMTree->pLoadInfo = pBlockLoadInfo; pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo; pMTree->destroyLoadInfo = destroyLoadInfo;
ASSERT(pMTree->pLoadInfo != NULL);
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter *pIter = NULL; struct SLDataIter *pIter = NULL;

View File

@ -243,7 +243,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
@ -251,7 +251,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
if (pTCol->colId == pSupInfo->colId[j]) { if (pTCol->colId == pSupInfo->colId[j]) {
if (!IS_BSMA_ON(pTCol)) { if (!IS_BSMA_ON(pTCol)) {
pSupInfo->smaValid = false; pSupInfo->smaValid = false;
return; return TSDB_CODE_SUCCESS;
} }
i += 1; i += 1;
@ -260,9 +260,11 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
// do nothing // do nothing
i += 1; i += 1;
} else { } else {
ASSERT(0); return TSDB_CODE_INVALID_PARA;
} }
} }
return TSDB_CODE_SUCCESS;
} }
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
@ -579,7 +581,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
} }
if (VND_IS_TSMA(pVnode)) { if (VND_IS_TSMA(pVnode)) {
tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode)); tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
} }
initReaderStatus(&pReader->status); initReaderStatus(&pReader->status);
@ -594,7 +596,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
ASSERT(pCond->numOfCols > 0);
if (pReader->pResBlock == NULL) { if (pReader->pResBlock == NULL) {
pReader->freeBlock = true; pReader->freeBlock = true;
@ -605,6 +606,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
} }
} }
if (pCond->numOfCols <= 0) {
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
code = TSDB_CODE_INVALID_PARA;
goto _end;
}
// todo refactor. // todo refactor.
limitOutputBufferSize(pCond, &pReader->capacity); limitOutputBufferSize(pCond, &pReader->capacity);
@ -794,8 +801,9 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
if (taosArrayGetSize(pBlockIter->blockList) == 0) { size_t num = taosArrayGetSize(pBlockIter->blockList);
ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList)); if (num == 0) {
ASSERT(pBlockIter->numOfBlocks == num);
return NULL; return NULL;
} }
@ -805,73 +813,6 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
int32_t midPos = -1;
int32_t numOfRows;
ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
TSKEY* keyList = (TSKEY*)pValue;
int32_t firstPos = 0;
int32_t lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key < keyList[lastPos]) {
lastPos += 1;
if (lastPos >= num) {
return -1;
} else {
return lastPos;
}
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
firstPos = midPos + 1;
} else if (key > keyList[midPos]) {
lastPos = midPos - 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1u) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) { static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
// start end position // start end position
int s, e; int s, e;
@ -972,8 +913,8 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t step = asc? 1:-1; int32_t step = asc? 1:-1;
// make sure it is aligned to 8bit // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model // 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
@ -1072,11 +1013,20 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
// pDumpInfo->rowIndex = 0; // pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
// pDumpInfo->rowIndex = pBlock->nRow - 1; // pDumpInfo->rowIndex = pBlock->nRow - 1;
} else { } else { // find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t pos = asc ? pBlock->nRow - 1 : 0; int32_t pos = asc ? pBlock->nRow - 1 : 0;
int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int64_t key = asc ? pReader->window.skey : pReader->window.ekey; int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order); pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
if (pDumpInfo->rowIndex < 0) {
tsdbError(
"%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
"-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
pBlock->maxVer, pReader->idStr);
return TSDB_CODE_INVALID_PARA;
}
} }
} }
@ -1183,7 +1133,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
ASSERT(pBlockInfo != NULL);
SDataBlk* pBlock = getCurrentBlock(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter);
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
@ -1221,8 +1170,6 @@ static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
} }
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) { static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
ASSERT(numOfTables >= 1);
pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables); pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
@ -1329,7 +1276,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
sup.numOfTables += 1; sup.numOfTables += 1;
} }
ASSERT(numOfBlocks == cnt); if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_INVALID_PARA;
}
// since there is only one table qualified, blocks are not sorted // since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) { if (sup.numOfTables == 1) {
@ -1351,10 +1301,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
pReader->idStr); pReader->idStr);
ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
SMultiwayMergeTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1432,8 +1381,6 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
} }
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
int32_t index = pBlockIter->index; int32_t index = pBlockIter->index;
@ -1924,7 +1871,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
ASSERT(mergeBlockData);
// merge with block data if ts == key // merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
@ -1990,7 +1936,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
tRowMergerClear(&merge); tRowMergerClear(&merge);
return code; return code;
} else { } else {
ASSERT(0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { // desc order } else { // desc order
@ -2011,7 +1956,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInLastBlock(pLastBlockReader)) { if (hasDataInLastBlock(pLastBlockReader)) {
@ -2235,7 +2179,6 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if (pReader->pReadSnap->pMem != NULL) { if (pReader->pReadSnap->pMem != NULL) {
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
if (d != NULL) { if (d != NULL) {
ASSERT(pBlockScanInfo->iter.iter == NULL);
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL); pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
@ -2349,10 +2292,9 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if (pBlockData->nRow > 0) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
ASSERT(pBlockData->nRow == pDumpInfo->totalRows); return false; // this is an invalid result.
} }
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
} }
@ -2583,7 +2525,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
int32_t code = 0; int32_t code = 0;
SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
ASSERT(pReader->pReadSnap != NULL);
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile; SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) { if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
@ -2868,7 +2809,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (pBlockInfo == NULL) { // build data block from last data file if (pBlockInfo == NULL) { // build data block from last data file
ASSERT(pBlockIter->numOfBlocks == 0);
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
@ -3837,15 +3777,16 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
} }
if (pReader->pSchema != NULL) { if (pReader->pSchema != NULL) {
updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
} }
STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader; STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables); pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
if (pReader->status.pTableMap == NULL) { if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(p);
*ppReader = NULL; *ppReader = NULL;
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
@ -4113,25 +4054,27 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} }
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;
int32_t code = 0; int32_t code = 0;
SColumnDataAgg ***pBlockSMA = &pDataBlock->pBlockAgg;
*allHave = false; *allHave = false;
*pBlockSMA = NULL;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
*pBlockSMA = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// there is no statistics data for composed block // there is no statistics data for composed block
if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) { if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
*pBlockSMA = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
ASSERT(pReader->pResBlock->info.id.uid == pFBlock->uid); if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
return TSDB_CODE_SUCCESS;
}
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (tDataBlkHasSma(pBlock)) { if (tDataBlkHasSma(pBlock)) {
@ -4159,13 +4102,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
// update the number of NULL data rows // update the number of NULL data rows
size_t numOfCols = pSup->numOfCols; size_t numOfCols = pSup->numOfCols;
int32_t i = 0, j = 0;
size_t size = taosArrayGetSize(pSup->pColAgg);
// ensure capacity // ensure capacity
if(pDataBlock->pDataBlock) { if (pDataBlock->pDataBlock) {
size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock); size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
taosArrayEnsureCap(pSup->pColAgg, colsNum); taosArrayEnsureCap(pSup->pColAgg, colsNum);
} }
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
@ -4176,8 +4116,9 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
// do fill all null column value SMA info // do fill all null column value SMA info
doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg); doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
size_t size = taosArrayGetSize(pSup->pColAgg);
i = 0, j = 0; int32_t i = 0, j = 0;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pSup->colId[j]) { if (pAgg->colId == pSup->colId[j]) {
@ -4187,7 +4128,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
j += 1; j += 1;
} }
@ -4420,9 +4361,12 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return terrno; return terrno;
} }
sversion = mr.me.stbEntry.schemaRow.version; sversion = mr.me.stbEntry.schemaRow.version;
} else { } else if (mr.me.type == TSDB_NORMAL_TABLE) {
ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
sversion = mr.me.ntbEntry.schemaRow.version; sversion = mr.me.ntbEntry.schemaRow.version;
} else {
terrno = TSDB_CODE_INVALID_PARA;
metaReaderClear(&mr);
return terrno;
} }
metaReaderClear(&mr); metaReaderClear(&mr);

View File

@ -62,8 +62,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock); pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
pEntry->dataLen = sizeof(SDeleterRes); pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows); // ASSERT(1 == pEntry->numOfRows);
ASSERT(3 == pEntry->numOfCols); // ASSERT(3 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
@ -167,7 +167,6 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataDeleterBuf* pBuf = NULL; SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
ASSERT(NULL != pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
taosFreeQitem(pBuf); taosFreeQitem(pBuf);

View File

@ -77,8 +77,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
@ -162,15 +162,14 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
ASSERT(NULL != pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
*pLen = pEntry->dataLen; *pLen = pEntry->dataLen;
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
@ -193,8 +192,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->numOfCols = pEntry->numOfCols; pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed; pOutput->compressed = pEntry->compressed;
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);

View File

@ -373,7 +373,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
pRsp->numOfRows, pExchangeInfo); pRsp->numOfRows, pExchangeInfo);
} else { } else {

View File

@ -104,8 +104,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock,
void setOperatorCompleted(SOperatorInfo* pOperator) { void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
ASSERT(pOperator->pTaskInfo != NULL);
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0; pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
@ -524,7 +522,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return true; return true;
} }
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
int32_t paramIndex, int32_t numOfRows) { int32_t paramIndex, int32_t numOfRows) {
if (pInput->pData[paramIndex] == NULL) { if (pInput->pData[paramIndex] == NULL) {
pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData)); pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
@ -548,8 +546,6 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
da = pInput->pColumnDataAgg[paramIndex]; da = pInput->pColumnDataAgg[paramIndex];
} }
ASSERT(!IS_VAR_DATA_TYPE(type));
if (type == TSDB_DATA_TYPE_BIGINT) { if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t v = pFuncParam->param.i; int64_t v = pFuncParam->param.i;
*da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows}; *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
@ -570,7 +566,7 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
} else if (type == TSDB_DATA_TYPE_TIMESTAMP) { } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
// do nothing // do nothing
} else { } else {
ASSERT(0); qError("invalid constant type for sma info");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -600,7 +596,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
// the data in the corresponding SColumnInfoData will not be used. // the data in the corresponding SColumnInfoData will not be used.
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
} }
} }
} else { } else {
@ -1577,8 +1573,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
// each operator should be set their own function to return total cost buffer // each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
if (pOperator->blocking) { if (pOperator->blocking) {
ASSERT(0); return -1;
return 0;
} else { } else {
return 0; return 0;
} }
@ -2205,7 +2200,6 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo); return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else { } else {
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pTableScanOp->info; *ppInfo = pInfo->pTableScanOp->info;
return 0; return 0;
} }
@ -2217,13 +2211,11 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
*ppNode = (STableScanPhysiNode*)pNode; *ppNode = (STableScanPhysiNode*)pNode;
return 0; return 0;
} else { } else {
ASSERT(0);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
} }
} else { } else {
if (LIST_LENGTH(pNode->pChildren) != 1) { if (LIST_LENGTH(pNode->pChildren) != 1) {
ASSERT(0);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
} }
@ -2233,32 +2225,6 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return -1; return -1;
} }
#if 0
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
STableScanInfo* pTableScanInfo = NULL;
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
return -1;
}
STableScanPhysiNode* pNode = NULL;
if (extractTableScanNode(plan->pNode, &pNode) < 0) {
ASSERT(0);
}
tsdbReaderClose(pTableScanInfo->dataReader);
STableListInfo info = {0};
pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
if (pTableScanInfo->dataReader == NULL) {
ASSERT(0);
qError("failed to create data reader");
return TSDB_CODE_APP_ERROR;
}
// TODO: set uid and ts to data reader
return 0;
}
#endif
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;

View File

@ -42,38 +42,40 @@ typedef struct SJoinOperatorInfo {
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param); static void destroyMergeJoinOperator(void* param);
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
SSortMergeJoinPhysiNode* pJoinNode); SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
SSortMergeJoinPhysiNode* pJoinNode) { SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
SNode* pMergeCondition = pJoinNode->pMergeCondition; SNode* pMergeCondition = pJoinNode->pMergeCondition;
if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) {
SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; qError("not support this in join operator, %s", idStr);
SColumnNode* col1 = (SColumnNode*)pNode->pLeft; return; // do not handle this
SColumnNode* col2 = (SColumnNode*)pNode->pRight; }
SColumnNode* leftTsCol = NULL;
SColumnNode* rightTsCol = NULL; SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
if (col1->dataBlockId == col2->dataBlockId ) { SColumnNode* col1 = (SColumnNode*)pNode->pLeft;
SColumnNode* col2 = (SColumnNode*)pNode->pRight;
SColumnNode* leftTsCol = NULL;
SColumnNode* rightTsCol = NULL;
if (col1->dataBlockId == col2->dataBlockId) {
leftTsCol = col1;
rightTsCol = col2;
} else {
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
leftTsCol = col1; leftTsCol = col1;
rightTsCol = col2; rightTsCol = col2;
} else { } else {
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
leftTsCol = col1; leftTsCol = col2;
rightTsCol = col2; rightTsCol = col1;
} else {
ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
leftTsCol = col2;
rightTsCol = col1;
}
} }
setJoinColumnInfo(&pInfo->leftCol, leftTsCol); }
setJoinColumnInfo(&pInfo->rightCol, rightTsCol); setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
} else { setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
ASSERT(false); }
}}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
@ -97,7 +99,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo));
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
@ -364,8 +366,6 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
*pRightTs = *(int64_t*)pRightVal; *pRightTs = *(int64_t*)pRightVal;
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
return true; return true;
} }

View File

@ -232,30 +232,6 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
if (!allColumnsHaveAgg) { if (!allColumnsHaveAgg) {
return false; return false;
} }
#if 0
// if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pBlock->pBlockAgg == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
size_t num = taosArrayGetSize(pTableScanInfo->matchInfo.pList);
for (int32_t i = 0; i < num; ++i) {
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
if (!pColMatchInfo->needOutput) {
continue;
}
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
}
#endif
return true; return true;
} }
@ -2206,6 +2182,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SArray* pColIds = NULL;
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -2228,7 +2205,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
@ -2324,6 +2301,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond)); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
} else { } else {
taosArrayDestroy(pColIds); taosArrayDestroy(pColIds);
pColIds = NULL;
} }
// create the pseduo columns info // create the pseduo columns info

View File

@ -46,13 +46,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pDescNode); SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
int32_t code = int32_t code =
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
@ -61,7 +63,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto _error; goto _error;
} }
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
@ -86,7 +88,10 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo); if (pInfo != NULL) {
destroySortOperatorInfo(pInfo);
}
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
@ -139,7 +144,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
int32_t numOfCols = taosArrayGetSize(pColMatchInfo); int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
@ -272,7 +276,6 @@ void destroySortOperatorInfo(void* param) {
} }
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info; SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
@ -329,7 +332,6 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
int32_t numOfCols = taosArrayGetSize(pColMatchInfo); int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
@ -746,7 +748,6 @@ void destroyMultiwayMergeOperatorInfo(void* param) {
} }
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;

View File

@ -49,7 +49,9 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
} }
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
ASSERT(fn != NULL); if (fn == NULL) {
return NULL;
}
if (capacity == 0) { if (capacity == 0) {
capacity = 4; capacity = 4;
@ -66,7 +68,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->equalFp = memcmp; pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn; pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) { if (!pHashObj->hashList) {

View File

@ -800,6 +800,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
} }
} }
// all sources are completed.
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
return NULL; return NULL;
} }