This commit is contained in:
54liuyao 2024-08-05 16:37:08 +08:00
commit 5c50970cca
15 changed files with 87 additions and 17 deletions

View File

@ -90,6 +90,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
pOperator->exprSupp.hasWindowOrGroup = false;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@ -140,6 +141,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
return code;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
@ -481,6 +485,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, &pageId);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
@ -497,9 +505,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pData->num = sizeof(SFilePage);
}
}

View File

@ -117,6 +117,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -287,6 +287,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -2018,6 +2018,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} else {
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
pCtx->udfName = taosStrdup(udfName);
QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
int32_t pageId = -1;
if (*currentPageId == -1) {
pData = getNewBufPage(pResultBuf, &pageId);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pData->num = sizeof(SFilePage);
} else {
pData = getBufPage(pResultBuf, *currentPageId);
@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
releaseBufPage(pResultBuf, pData);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pData->num = sizeof(SFilePage);
}
}

View File

@ -481,6 +481,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
}
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);

View File

@ -1193,6 +1193,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
uint32_t defaultBufsz = 0;
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) {
}
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
QUERY_CHECK_NULL(pVal, code, lino, _end, terrno);
pVal->pName = taosStrdup(pMetaReader->me.name);
QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno);
pVal->pTags = NULL;
// only child table has tag value
if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
pVal->pTags = taosMemoryMalloc(pTag->len);
QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno);
memcpy(pVal->pTags, pTag, pTag->len);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
return pVal;
}
@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
pHandle->api.metaReaderFn.readerReleaseLock(&mr);
STableCachedVal* pVal = createTableCacheVal(&mr);
if(!pVal) {
return terrno;
}
val = *pVal;
freeReader = true;
@ -1297,7 +1310,9 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
cleanupQueryTableDataCond(&pBase->cond);
pAPI->tsdReaderClose(pBase->dataReader);
if (pAPI->tsdReaderClose) {
pAPI->tsdReaderClose(pBase->dataReader);
}
pBase->dataReader = NULL;
if (pBase->matchInfo.pList != NULL) {
@ -1367,6 +1382,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
QUERY_CHECK_CODE(code, lino, _error);
@ -2496,6 +2513,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of
void* data = colDataGetData(pColPk, i);
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE);
QUERY_CHECK_NULL(tmq, code, lino, _end, terrno);
memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData);
varDataLen(tmq) = offset->primaryKey.nData;
p[i] = (*ts > offset->ts) || (func(data, tmq) > 0);
@ -2720,6 +2738,7 @@ _end:
static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SValue val = {0};
if (hasPrimaryKey) {
code = doBlockDataPrimaryKeyFilter(pBlock, offset);
@ -2736,6 +2755,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
val.type = pColPk->info.type;
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
val.pData = taosMemoryMalloc(varDataLen(tmp));
QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno);
val.nData = varDataLen(tmp);
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
} else {
@ -2743,6 +2763,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff
}
}
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
@ -4012,6 +4037,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _error);
@ -4410,6 +4436,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
void* tmp = taosArrayPush(aUidTags, &info);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);

View File

@ -839,6 +839,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -899,6 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1376,7 +1376,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1899,6 +1899,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
@ -3735,6 +3736,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -4904,6 +4906,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -5189,6 +5192,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
pInfo->interval = (SInterval){
.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,

View File

@ -2142,6 +2142,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
lino = __LINE__;
goto _error;
}
@ -2150,9 +2151,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
int32_t num = 0;
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
QUERY_CHECK_CODE(code, lino, _error);
extractTbnameSlotId(pInfo, pScanNode);
@ -2160,9 +2159,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
pInfo->accountId = pScanPhyNode->accountId;
pInfo->pUser = taosStrdup((void*)pUser);
QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno);
pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
@ -2202,7 +2203,7 @@ _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -2232,15 +2233,19 @@ void destroySysScanOperator(void* param) {
(void)tsem_destroy(&pInfo->ready);
blockDataDestroy(pInfo->pRes);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
}
if (pInfo->name.type == TSDB_TABLE_NAME_T) {
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
}
pInfo->pCur = NULL;
pInfo->pCur = NULL;
}
} else {
qError("pInfo->name is not initialized");
}
if (pInfo->pIdx) {
@ -2762,6 +2767,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
}
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->pResBlock, 1);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1159,6 +1159,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pLinearInfo = NULL;
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;

View File

@ -1303,6 +1303,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
SExprSupp* pSup = &pOperator->exprSupp;
@ -1647,6 +1648,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
@ -1721,6 +1723,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
@ -2061,6 +2064,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&iaInfo->binfo, pResBlock);
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
@ -2388,6 +2392,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
QUERY_CHECK_CODE(code, lino, _error);