fix issue for function res
This commit is contained in:
parent
37fc4f5674
commit
1b5cf65ab9
|
@ -73,7 +73,8 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -84,6 +85,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
pOperator->exprSupp.hasWindowOrGroup = false;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
@ -91,6 +93,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -140,6 +143,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
return code;
|
||||
|
||||
_error:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
if (pInfo != NULL) {
|
||||
destroyAggOperatorInfo(pInfo);
|
||||
}
|
||||
|
@ -480,6 +486,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
|
|||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
|
@ -496,9 +506,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
|
|||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
|
||||
code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -271,6 +271,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1835,6 +1835,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
}
|
||||
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
if (!pExprs) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
STargetNode* pTargetNode = NULL;
|
||||
|
@ -1937,6 +1940,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
} else {
|
||||
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
||||
pCtx->udfName = taosStrdup(udfName);
|
||||
QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
|
||||
|
||||
code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
|
|
@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
|
|||
int32_t pageId = -1;
|
||||
if (*currentPageId == -1) {
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return NULL;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
pData = getBufPage(pResultBuf, *currentPageId);
|
||||
|
@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
|
|||
releaseBufPage(pResultBuf, pData);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
if (pData == NULL) {
|
||||
qError("failed to get buffer, code:%s", tstrerror(terrno));
|
||||
return NULL;
|
||||
}
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -454,7 +454,8 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN
|
|||
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -464,6 +465,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
|
|
|
@ -1168,6 +1168,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
uint32_t defaultBufsz = 0;
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
|
||||
code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
|
|
|
@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) {
|
|||
}
|
||||
|
||||
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
|
||||
QUERY_CHECK_NULL(pVal, code, lino, _end, terrno);
|
||||
pVal->pName = taosStrdup(pMetaReader->me.name);
|
||||
QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno);
|
||||
pVal->pTags = NULL;
|
||||
|
||||
// only child table has tag value
|
||||
if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
|
||||
STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
|
||||
pVal->pTags = taosMemoryMalloc(pTag->len);
|
||||
QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno);
|
||||
memcpy(pVal->pTags, pTag, pTag->len);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
return pVal;
|
||||
}
|
||||
|
||||
|
@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
pHandle->api.metaReaderFn.readerReleaseLock(&mr);
|
||||
|
||||
STableCachedVal* pVal = createTableCacheVal(&mr);
|
||||
if(!pVal) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
val = *pVal;
|
||||
freeReader = true;
|
||||
|
@ -1356,6 +1369,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||
|
||||
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -2485,6 +2500,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of
|
|||
void* data = colDataGetData(pColPk, i);
|
||||
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
||||
void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE);
|
||||
QUERY_CHECK_NULL(tmq, code, lino, _end, terrno);
|
||||
memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData);
|
||||
varDataLen(tmq) = offset->primaryKey.nData;
|
||||
p[i] = (*ts > offset->ts) || (func(data, tmq) > 0);
|
||||
|
@ -2709,6 +2725,7 @@ _end:
|
|||
|
||||
static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SValue val = {0};
|
||||
if (hasPrimaryKey) {
|
||||
code = doBlockDataPrimaryKeyFilter(pBlock, offset);
|
||||
|
@ -2725,6 +2742,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
|
|||
val.type = pColPk->info.type;
|
||||
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
||||
val.pData = taosMemoryMalloc(varDataLen(tmp));
|
||||
QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno);
|
||||
val.nData = varDataLen(tmp);
|
||||
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
|
||||
} else {
|
||||
|
@ -2732,6 +2750,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
|
|||
}
|
||||
}
|
||||
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3990,6 +4013,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -4384,6 +4408,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
}
|
||||
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
||||
QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno);
|
||||
|
||||
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
||||
void* tmp = taosArrayPush(aUidTags, &info);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
|
|
@ -831,6 +831,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -886,6 +886,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1352,7 +1352,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1888,6 +1888,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
|
@ -3700,6 +3701,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4851,6 +4853,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -5129,6 +5132,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->interval = (SInterval){
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
|
|
|
@ -2083,6 +2083,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
lino = __LINE__;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -2091,9 +2092,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
|
||||
int32_t num = 0;
|
||||
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
extractTbnameSlotId(pInfo, pScanNode);
|
||||
|
||||
|
@ -2101,9 +2100,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
|||
|
||||
pInfo->accountId = pScanPhyNode->accountId;
|
||||
pInfo->pUser = taosStrdup((void*)pUser);
|
||||
QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno);
|
||||
pInfo->sysInfo = pScanPhyNode->sysInfo;
|
||||
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->pCondition = pScanNode->node.pConditions;
|
||||
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2143,7 +2144,7 @@ _error:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
destroyOperator(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
|
@ -2173,15 +2174,19 @@ void destroySysScanOperator(void* param) {
|
|||
(void)tsem_destroy(&pInfo->ready);
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
}
|
||||
if (pInfo->name.type == TSDB_TABLE_NAME_T) {
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
}
|
||||
|
||||
pInfo->pCur = NULL;
|
||||
pInfo->pCur = NULL;
|
||||
}
|
||||
} else {
|
||||
qError("pInfo->name is not initialized");
|
||||
}
|
||||
|
||||
if (pInfo->pIdx) {
|
||||
|
@ -2694,6 +2699,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
}
|
||||
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||
code = blockDataEnsureCapacity(pInfo->pResBlock, 1);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1150,6 +1150,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pLinearInfo = NULL;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||
pInfo->win = pInterpPhyNode->timeRange;
|
||||
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||
pInfo->current = pInfo->win.skey;
|
||||
|
|
|
@ -1285,6 +1285,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
@ -1613,6 +1614,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
|
@ -1684,6 +1686,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
|
@ -2019,6 +2022,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&iaInfo->binfo, pResBlock);
|
||||
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2344,6 +2348,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
}
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
|
||||
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
Loading…
Reference in New Issue