Merge pull request #17926 from taosdata/feature/3_liaohj
enh(query): add cache for table meta entry in table scan.
This commit is contained in:
commit
9d830206db
|
@ -153,7 +153,7 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
|
||||||
|
|
||||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta *pMeta = pReader->pMeta;
|
||||||
int64_t version;
|
int64_t version1;
|
||||||
|
|
||||||
// query uid.idx
|
// query uid.idx
|
||||||
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
|
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
|
||||||
|
@ -161,8 +161,8 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
version = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
||||||
return metaGetTableEntryByVersion(pReader, version, uid);
|
return metaGetTableEntryByVersion(pReader, version1, uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
||||||
|
|
|
@ -298,6 +298,12 @@ typedef struct {
|
||||||
SExprSupp* pExprSup; // expr supporter of aggregate operator
|
SExprSupp* pExprSup; // expr supporter of aggregate operator
|
||||||
} SAggOptrPushDownInfo;
|
} SAggOptrPushDownInfo;
|
||||||
|
|
||||||
|
typedef struct STableMetaCacheInfo {
|
||||||
|
SLRUCache* pTableMetaEntryCache; // 100 by default
|
||||||
|
uint64_t metaFetch;
|
||||||
|
uint64_t cacheHit;
|
||||||
|
} STableMetaCacheInfo;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
STsdbReader* dataReader;
|
STsdbReader* dataReader;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
|
@ -317,6 +323,7 @@ typedef struct STableScanInfo {
|
||||||
int8_t scanMode;
|
int8_t scanMode;
|
||||||
SAggOptrPushDownInfo pdInfo;
|
SAggOptrPushDownInfo pdInfo;
|
||||||
int8_t assignBlockUid;
|
int8_t assignBlockUid;
|
||||||
|
STableMetaCacheInfo metaCache;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
|
@ -325,7 +332,6 @@ typedef struct STableMergeScanInfo {
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SArray* dataReaders; // array of tsdbReaderT*
|
|
||||||
SArray* queryConds; // array of queryTableDataCond
|
SArray* queryConds; // array of queryTableDataCond
|
||||||
STsdbReader* pReader;
|
STsdbReader* pReader;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
|
@ -877,8 +883,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
|
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
|
||||||
SSDataBlock* pBlock, int32_t rows, const char* idStr);
|
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache);
|
||||||
|
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
|
|
@ -172,7 +172,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
||||||
pRes->info.rows, GET_TASKID(pTaskInfo));
|
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -221,7 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
|
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <vnode.h>
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
@ -335,7 +336,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -491,51 +492,128 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
||||||
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
|
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
|
||||||
|
|
||||||
switchCtxOrder(pCtx, numOfOutput);
|
switchCtxOrder(pCtx, numOfOutput);
|
||||||
// setupQueryRangeForReverseScan(pTableScanInfo);
|
|
||||||
|
|
||||||
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
||||||
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
|
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
|
||||||
TSWAP(pTWindow->skey, pTWindow->ekey);
|
TSWAP(pTWindow->skey, pTWindow->ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
typedef struct STableCachedVal {
|
||||||
SSDataBlock* pBlock, int32_t rows, const char* idStr) {
|
const char* pName;
|
||||||
|
STag* pTags;
|
||||||
|
} STableCachedVal;
|
||||||
|
|
||||||
|
static void freeTableCachedVal(void* param) {
|
||||||
|
if (param == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableCachedVal* pVal = param;
|
||||||
|
taosMemoryFree((void*)pVal->pName);
|
||||||
|
taosMemoryFree(pVal->pTags);
|
||||||
|
taosMemoryFree(pVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
//const void *key, size_t keyLen, void *value
|
||||||
|
static void freeCachedMetaItem(const void *key, size_t keyLen, void *value) {
|
||||||
|
freeTableCachedVal(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
|
||||||
|
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (numOfPseudoExpr <= 0) {
|
if (numOfExpr <= 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
// backup the rows
|
// backup the rows
|
||||||
int32_t backupRows = pBlock->info.rows;
|
int32_t backupRows = pBlock->info.rows;
|
||||||
pBlock->info.rows = rows;
|
pBlock->info.rows = rows;
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
bool freeReader = false;
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
STableCachedVal val = {0};
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
|
||||||
metaReaderReleaseLock(&mr);
|
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
LRUHandle* h = NULL;
|
||||||
|
|
||||||
|
// 1. check if it is existed in meta cache
|
||||||
|
if (pCache == NULL) {
|
||||||
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
|
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
|
metaReaderReleaseLock(&mr);
|
||||||
SExprInfo* pExpr = &pPseudoExpr[j];
|
|
||||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
val.pName = mr.me.name;
|
||||||
|
val.pTags = (STag*)mr.me.ctbEntry.pTags;
|
||||||
|
|
||||||
|
freeReader = true;
|
||||||
|
} else {
|
||||||
|
pCache->metaFetch += 1;
|
||||||
|
|
||||||
|
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||||
|
if (h == NULL) {
|
||||||
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
|
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderReleaseLock(&mr);
|
||||||
|
|
||||||
|
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
|
||||||
|
pVal->pName = strdup(mr.me.name);
|
||||||
|
pVal->pTags = NULL;
|
||||||
|
|
||||||
|
// only child table has tag value
|
||||||
|
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
|
STag* pTag = (STag*)mr.me.ctbEntry.pTags;
|
||||||
|
pVal->pTags = taosMemoryMalloc(pTag->len);
|
||||||
|
memcpy(pVal->pTags, mr.me.ctbEntry.pTags, pTag->len);
|
||||||
|
}
|
||||||
|
|
||||||
|
val = *pVal;
|
||||||
|
freeReader = true;
|
||||||
|
|
||||||
|
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||||
|
if (ret != TAOS_LRU_STATUS_OK) {
|
||||||
|
qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
|
||||||
|
freeTableCachedVal(pVal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pCache->cacheHit += 1;
|
||||||
|
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||||
|
val = *pVal;
|
||||||
|
taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("retrieve table meta from cache:%"PRIu64", hit:%"PRIu64 " miss:%"PRIu64", %s", pCache->metaFetch, pCache->cacheHit,
|
||||||
|
(pCache->metaFetch - pCache->cacheHit), idStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||||
|
const SExprInfo* pExpr1 = &pExpr[j];
|
||||||
|
int32_t dstSlotId = pExpr1->base.resSchema.slotId;
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
|
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
|
||||||
|
|
||||||
int32_t functionId = pExpr->pExpr->_function.functionId;
|
int32_t functionId = pExpr1->pExpr->_function.functionId;
|
||||||
|
|
||||||
// this is to handle the tbname
|
// this is to handle the tbname
|
||||||
if (fmIsScanPseudoColumnFunc(functionId)) {
|
if (fmIsScanPseudoColumnFunc(functionId)) {
|
||||||
setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name);
|
setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
|
||||||
} else { // these are tags
|
} else { // these are tags
|
||||||
STagVal tagVal = {0};
|
STagVal tagVal = {0};
|
||||||
tagVal.cid = pExpr->base.pParam[0].pCol->colId;
|
tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
|
||||||
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
|
const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
|
||||||
|
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
||||||
|
@ -560,10 +638,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
|
|
||||||
// restore the rows
|
// restore the rows
|
||||||
pBlock->info.rows = backupRows;
|
pBlock->info.rows = backupRows;
|
||||||
|
if (freeReader) {
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -811,6 +891,7 @@ static void destroyTableScanOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pTableScanInfo->matchInfo.pList);
|
taosArrayDestroy(pTableScanInfo->matchInfo.pList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache);
|
||||||
cleanupExprSupp(&pTableScanInfo->pseudoSup);
|
cleanupExprSupp(&pTableScanInfo->pseudoSup);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
@ -874,6 +955,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
|
||||||
|
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||||
getTableScannerExecInfo);
|
getTableScannerExecInfo);
|
||||||
|
|
||||||
|
@ -1624,7 +1708,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo));
|
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -4362,7 +4446,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||||
pBlock->info.rows, GET_TASKID(pTaskInfo));
|
pBlock->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -482,24 +482,31 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
|
SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||||
|
|
||||||
|
pSup->pExprInfo = pExprInfo;
|
||||||
|
pSup->numOfExprs = numOfCols;
|
||||||
|
|
||||||
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
|
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
|
|
||||||
|
pInfo->binfo.pRes = createResDataBlock(pDescNode);
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
int32_t code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
int32_t code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
||||||
&pInfo->matchInfo);
|
&pInfo->matchInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
goto _error;
|
||||||
pInfo->binfo.pRes = pResBlock;
|
}
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
|
||||||
|
|
||||||
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
|
|
||||||
|
@ -508,8 +515,6 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
|
||||||
|
@ -523,8 +528,10 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
pTaskInfo->code = code;
|
||||||
taosMemoryFree(pInfo);
|
if (pInfo != NULL) {
|
||||||
|
destroyGroupSortOperatorInfo(pInfo);
|
||||||
|
}
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue