Merge pull request #26985 from taosdata/fix/TD-31212

fix res issue
This commit is contained in:
Haojun Liao 2024-08-06 10:42:30 +08:00 committed by GitHub
commit e9c11bf5b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 490 additions and 52 deletions

View File

@ -218,6 +218,6 @@ uint64_t calcGroupId(char* pData, int32_t len);
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
int32_t extractKeysLen(const SArray* keys);
int32_t extractKeysLen(const SArray* keys, int32_t* pLen);
#endif // TDENGINE_EXECUTIL_H

View File

@ -409,6 +409,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, 0);
}
*ppBlock = pBlock;

View File

@ -58,13 +58,16 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchI
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SNode* pNode;
int32_t idx = 0;
FOREACH(pNode, pScan->pTargets) {
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
pColInfo->info.colId = pCol->colId;
}
idx++;
@ -72,6 +75,7 @@ static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode
for (; idx < pBlock->pDataBlock->size; ++idx) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
if (pScan->scan.pScanPseudoCols) {
FOREACH(pNode, pScan->scan.pScanPseudoCols) {
STargetNode* pTarget = (STargetNode*)pNode;
@ -82,6 +86,12 @@ static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode
}
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
@ -126,6 +136,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
} else {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
if (pItem->isPk) {
pInfo->numOfPks += 1;
pInfo->pkCol.type = pItem->dataType.type; // only record one primary key
@ -148,10 +159,14 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
void* tmp = taosArrayPush(pCidList, &pColInfo->colId);
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i);
void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
pColInfo->funcType = *(int32_t*)pFuncType;
}
}
pInfo->pCidList = pCidList;
@ -187,13 +202,16 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
QUERY_CHECK_CODE(code, lino, _error);
setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
QUERY_CHECK_CODE(code, lino, _error);
code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
QUERY_CHECK_CODE(code, lino, _error);
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
capacity = 1; // only one row output
setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
QUERY_CHECK_CODE(code, lino, _error);
}
initResultSizeInfo(&pOperator->resultInfo, capacity);
@ -283,10 +301,13 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
colDataSetNULL(pDst, 0);
@ -299,7 +320,10 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
}
}
pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
pRes->info.id.uid = *(tb_uid_t*)pUid;
pRes->info.rows = 1;
pRes->info.scanFlag = MAIN_SCAN;
@ -368,7 +392,9 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
void* pUid = taosArrayGet(pInfo->pUidList, 0);
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
pInfo->pRes->info.rows, pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
@ -450,6 +476,9 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
if (!pColMatch) {
return terrno;
}
bool found = false;
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
@ -486,11 +515,16 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
for (int32_t i = 0; i < size; ++i) {
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
if (!pColInfo) {
return terrno;
}
int32_t slotId = pColInfo->dstSlotId;
SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);

View File

@ -65,6 +65,9 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
if (!pBuffInfo) {
return NULL;
}
int32_t size = taosArrayGetSize(pCountSup->pWinStates);
// coverity scan
ASSERTS(size > 0, "WinStates is empty");
@ -99,7 +102,8 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SCountWindowOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
for (int32_t i = 0; i < pBlock->info.rows;) {
SCountWindowResult* pBuffInfo = NULL;
@ -140,6 +144,13 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
i += step;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo,

View File

@ -284,6 +284,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
QUERY_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
SColumnInfoData *ps = NULL, *pe = NULL;

View File

@ -52,7 +52,7 @@ static void setAllSourcesCompleted(SOperatorInfo* pOperator);
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
static int32_t getCompletedSources(const SArray* pArray);
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
@ -65,8 +65,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) {
int32_t code = 0;
int32_t lino = 0;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
int32_t completed = 0;
code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator);
return;
@ -84,6 +90,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
for (int32_t i = 0; i < totalSources; ++i) {
pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
continue;
}
@ -100,6 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tmemory_barrier();
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
// todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
@ -159,7 +167,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
return;
} // end loop
int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo);
int32_t complete1 = 0;
code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (complete1 == totalSources) {
qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
return;
@ -350,9 +363,13 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
for (int32_t i = 0; i < numOfSources; ++i) {
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
void* tmp = taosArrayPush(pInfo->pSources, pNode);
if (!pNode) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return TSDB_CODE_OUT_OF_MEMORY;
}
void* tmp = taosArrayPush(pInfo->pSources, pNode);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return TSDB_CODE_OUT_OF_MEMORY;
}
SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
@ -391,6 +408,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
@ -477,6 +495,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (!pSourceDataInfo) {
return terrno;
}
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
@ -547,12 +568,20 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
if (!pDataInfo) {
return terrno;
}
if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
return TSDB_CODE_SUCCESS;
}
pDataInfo->status = EX_SOURCE_DATA_STARTED;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
if (!pSource) {
return terrno;
}
pDataInfo->startTime = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
@ -727,18 +756,26 @@ void setAllSourcesCompleted(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
}
int32_t getCompletedSources(const SArray* pArray) {
size_t total = taosArrayGetSize(pArray);
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
size_t total = taosArrayGetSize(pArray);
int32_t completed = 0;
for (int32_t k = 0; k < total; ++k) {
SSourceDataInfo* p = taosArrayGet(pArray, k);
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
return completed;
*pRes = completed;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
@ -855,6 +892,11 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
}
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
if (!pDataInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
@ -870,6 +912,11 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (!pSource) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
@ -952,6 +999,9 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
} else {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
if (!pDataInfo) {
return terrno;
}
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
}

View File

@ -233,6 +233,13 @@ SArray* createSortInfo(SNodeList* pNodeList) {
for (int32_t i = 0; i < numOfCols; ++i) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
if (!pSortKey) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosArrayDestroy(pList);
pList = NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
break;
}
SBlockOrderInfo bi = {0};
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
@ -267,6 +274,13 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
if (!pDescNode) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock);
pBlock = NULL;
terrno = code;
break;
}
SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale;
@ -290,9 +304,17 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
if (!pItem) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
if (!pInfoData) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type;
@ -652,6 +674,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
for (int32_t i = 0; i < rows; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
STUidTagInfo info = {.uid = pkeyInfo->uid};
void* tmp = taosArrayPush(pUidTagList, &info);
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
@ -706,6 +729,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
code = colDataAssign(output.columnData, pColInfo, rows, NULL);
} else if (nodeType(pNode) == QUERY_NODE_VALUE) {
continue;
@ -749,6 +773,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
for (int i = 0; i < rows; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
QUERY_CHECK_NULL(info, code, lino, end, terrno);
char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
@ -867,12 +892,15 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
// remove the duplicates
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
void* tmpTbl = taosArrayGet(pTbList, 0);
QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
void* tmp = taosArrayPush(pNewList, tmpTbl);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
for (int32_t i = 1; i < numOfTables; ++i) {
char** name = taosArrayGetLast(pNewList);
char** nameInOldList = taosArrayGet(pTbList, i);
QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
if (strcmp(*name, *nameInOldList) == 0) {
continue;
}
@ -999,6 +1027,10 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
for (int i = 0; i < numOfExisted; i++) {
STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
if (!pTInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
@ -1050,7 +1082,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
SColumnInfoData colInfo = {0};
colInfo.info = *(SColumnInfo*)taosArrayGet(pColList, i);
void* tmp = taosArrayGet(pColList, i);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
colInfo.info = *(SColumnInfo*)tmp;
code = blockDataAppendColInfo(pResBlock, &colInfo);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -1068,9 +1102,11 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
for (int32_t i = 0; i < numOfTables; i++) {
STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
for (int32_t j = 0; j < numOfCols; j++) {
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
if (pColInfo->info.colId == -1) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
@ -1144,7 +1180,12 @@ static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, co
int32_t numOfTables = taosArrayGetSize(pUidTagList);
for (int32_t i = 0; i < numOfTables; ++i) {
if (pResultList[i]) {
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
if (!tmpTag) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
uint64_t uid = tmpTag->uid;
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
info.uid = uid;
@ -1174,6 +1215,10 @@ static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
for (int32_t i = 0; i < numOfExisted; ++i) {
uint64_t* uid = taosArrayGet(pUidList, i);
if (!uid) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
STUidTagInfo info = {.uid = *uid};
void* tmp = taosArrayPush(pUidTagList, &info);
if (!tmp) {
@ -1242,6 +1287,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
for (int32_t i = 0; i < numOfRows; ++i) {
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
void* tmp = taosArrayPush(pUidList, &pInfo->uid);
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
}
@ -1392,7 +1438,9 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
*(int32_t*)pPayload = numOfTables;
if (numOfTables > 0) {
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
void* tmp = taosArrayGet(pUidList, 0);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
}
code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
@ -1408,7 +1456,9 @@ _end:
if (!listAdded) {
numOfTables = taosArrayGetSize(pUidList);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0};
void* tmp = taosArrayGet(pUidList, i);
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
@ -1429,6 +1479,8 @@ _error:
}
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSubplan* pSubplan = (SSubplan*)node;
SScanPhysiNode pNode = {0};
pNode.suid = suid;
@ -1436,17 +1488,20 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList
pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
return terrno;
}
QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
uint8_t digest[17] = {0};
int code =
code =
getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
QUERY_CHECK_CODE(code, lino, _end);
*tableList = pTableListInfo->pTableList;
pTableListInfo->pTableList = NULL;
tableListDestroy(pTableListInfo);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
@ -1554,6 +1609,11 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
if (!pColNode) {
taosArrayDestroy(pList);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return NULL;
}
// todo extract method
SColumn c = {0};
@ -1591,6 +1651,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
@ -1610,6 +1671,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
for (int32_t i = 0; i < num; ++i) {
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
// todo: add reserve flag check
// it is a column reserved for the arithmetic expression calculation
@ -1621,6 +1683,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SColMatchItem* info = NULL;
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
info = taosArrayGet(pList, j);
QUERY_CHECK_NULL(info, code, lino, _end, terrno);
if (info->dstSlotId == pNode->slotId) {
break;
}
@ -1763,6 +1826,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
for (int32_t j = 0; j < numOfParam; ++j) {
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
if (p1->type == QUERY_NODE_COLUMN) {
SColumnNode* pcn = (SColumnNode*)p1;
@ -1839,6 +1903,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
int32_t code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pExprs);
terrno = code;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return NULL;
}
@ -1874,6 +1939,10 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo**
} else {
pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
}
if (!pTargetNode) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
SExprInfo* pExp = &pExprs[i];
code = createExprFromTargetNode(pExp, pTargetNode);
@ -2051,10 +2120,20 @@ int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SAr
int32_t i = 0, j = 0;
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
if (!pmInfo) {
return terrno;
}
if (p->info.colId == pmInfo->colId) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
if (!pDst) {
return terrno;
}
code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
@ -2128,6 +2207,10 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
if (!pNode) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
if (pColNode->colType == COLUMN_TYPE_TAG) {
continue;
@ -2318,6 +2401,10 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st
for (int32_t i = startIndex; i < numOfTables; ++i) {
STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return -1;
}
if (p->uid == uid) {
return i;
}
@ -2489,22 +2576,30 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
}
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
if (!pInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
uint64_t gid = pInfo->groupId;
int32_t start = 0;
void* tmp = taosArrayPush(pList, &start);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
for (int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (!pInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (pInfo->groupId != gid) {
tmp = taosArrayPush(pList, &i);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
gid = pInfo->groupId;
}
@ -2542,6 +2637,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
if (!info) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
info->groupId = groupByTbname ? info->uid : 0;
int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
@ -2554,6 +2653,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
} else {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
if (!info) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
info->groupId = groupByTbname ? info->uid : 0;
}
}
@ -2597,6 +2700,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
@ -2822,13 +2929,22 @@ SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
return ret;
}
int32_t extractKeysLen(const SArray* keys) {
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t len = 0;
int32_t keyNum = taosArrayGetSize(keys);
for (int32_t i = 0; i < keyNum; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
len += pCol->bytes;
}
len += sizeof(int8_t) * keyNum; // null flag
return len;
*pLen = len;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -392,6 +392,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
for (int32_t i = 0; i < numOfUids; ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
QUERY_CHECK_NULL(id, code, lino, _end, terrno);
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
@ -493,6 +494,12 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i);
if (!uid) {
taosMemoryFree(keyBuf);
taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return terrno;
}
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
if (bufLen > 0) {
@ -547,6 +554,10 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
}
SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
if (!pSchemaInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
*sversion = pSchemaInfo->sw->version;
*tversion = pSchemaInfo->tversion;
@ -704,7 +715,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
p = p1;
} else {
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
p = *(SSDataBlock**)tmp;
code = copyDataBlock(p, pRes);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -752,7 +765,9 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
blockDataDestroy(*p);
if (p) {
blockDataDestroy(*p);
}
}
taosArrayClear(pTaskInfo->pResultBlockList);
@ -866,6 +881,10 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
for (int32_t i = 0; i < num; ++i) {
SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
if (!pStop) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
continue;
}
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
if (pExchangeInfo) {
(void)tsem_post(&pExchangeInfo->ready);

View File

@ -583,8 +583,10 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
size_t size = taosArrayGetSize(pColMatchInfo->pList);
for (int32_t i = 0; i < size; ++i) {
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
QUERY_CHECK_CODE(code, lino, _err);
@ -673,6 +675,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
code = colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
@ -1052,6 +1055,10 @@ bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) == 1) {
SNode* p = nodesListGetNode(pGroupList, 0);
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return false;
}
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);

View File

@ -58,7 +58,7 @@ typedef struct SFillOperatorInfo {
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
static void destroyFillOperatorInfo(void* param);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
@ -83,7 +83,11 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
fillResetPrevForNewGroup(pInfo->pFillInfo);
int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
}
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
@ -146,13 +150,22 @@ _end:
}
}
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
if (!pFillInfo->pFillCol[colIdx].notFillCol) {
SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
QUERY_CHECK_NULL(key, code, lino, _end, terrno);
key->isNull = true;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// todo refactor: decide the start key according to the query time range.

View File

@ -100,6 +100,10 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
if (!pCol) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
(*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0};
@ -888,10 +892,10 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
for (int32_t i = 0; i < size; i++) {
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
if (pGp->blockForNotLoaded) {
if (pGp && pGp->blockForNotLoaded) {
for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
blockDataDestroy(*pBlock);
if (pBlock) blockDataDestroy(*pBlock);
}
taosArrayClear(pGp->blockForNotLoaded);
pGp->offsetForNotLoaded = 0;
@ -916,6 +920,9 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
if (!pBlock) {
return NULL;
}
pGroupInfo->offsetForNotLoaded++;
return *pBlock;
}
@ -946,10 +953,18 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
++pInfo->groupIndex;
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
if (pGroupInfo == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
pInfo->pageIndex = 0;
}
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
if (pageId == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
void* page = getBufPage(pInfo->pBuf, *pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
@ -1104,7 +1119,9 @@ static void destroyPartitionOperatorInfo(void* param) {
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
for (int32_t i = 0; i < size; i++) {
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
taosArrayDestroy(pGp->pPageList);
if (pGp) {
taosArrayDestroy(pGp->pPageList);
}
}
taosArrayDestroy(pInfo->sortedGroupArray);
@ -1276,7 +1293,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
QUERY_CHECK_NULL(pDestCol, code, lino, _end, terrno);
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
@ -1342,6 +1361,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = 0;
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
@ -1358,6 +1378,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
pDestBlock->info.rows--;
} else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
QUERY_CHECK_NULL(pTbNameCol, code, lino, _end, terrno);
colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
tbName[0] = 0;
}
@ -1371,6 +1392,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
}
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
QUERY_CHECK_NULL(pGpIdCol, code, lino, _end, terrno);
code = colDataSetVal(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
QUERY_CHECK_CODE(code, lino, _end);
pDestBlock->info.rows++;
@ -1557,7 +1579,11 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
taosArrayDestroy(pInfo->partitionSup.pGroupCols);
for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
void* tmp = taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
if (!tmp) {
continue;
}
SGroupKeys key = *(SGroupKeys*)tmp;
taosMemoryFree(key.pData);
}
taosArrayDestroy(pInfo->partitionSup.pGroupColVals);
@ -1777,6 +1803,7 @@ int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

View File

@ -295,8 +295,9 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
@ -334,6 +335,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
@ -366,8 +372,9 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL){
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
@ -393,11 +400,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
@ -411,8 +418,9 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
@ -453,8 +461,9 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}

View File

@ -485,6 +485,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
int32_t TWOMB = 2 * 1024 * 1024;

View File

@ -1071,6 +1071,10 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
int32_t tableIdx = 0;
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
if (!pUid) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t))) {
if (TSDB_CODE_DUP_KEY == terrno) {
@ -3178,6 +3182,8 @@ FETCH_NEXT_BLOCK:
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.parTbName[0]) {
code =
@ -3386,6 +3392,7 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) <
@ -3473,7 +3480,9 @@ FETCH_NEXT_BLOCK:
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
QUERY_CHECK_NULL(pData, code, lino, _end, terrno);
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno);
if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo);
@ -3685,15 +3694,18 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
lino = __LINE__;
goto _end;
}
pInfo->pTableListInfo = tableListCreate();
QUERY_CHECK_NULL(pInfo->pTableListInfo, code, lino, _end, terrno);
pInfo->vnode = pHandle->vnode;
pInfo->pAPI = &pTaskInfo->storageAPI;
@ -3707,6 +3719,9 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
return code;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = code;
@ -3901,6 +3916,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
for (int32_t i = 0; i < numOfOutput; ++i) {
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
QUERY_CHECK_NULL(id, code, lino, _error, terrno);
int16_t colId = id->colId;
void* tmp = taosArrayPush(pColIds, &colId);
@ -4359,8 +4375,10 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
for (int i = 0; i < szTables; ++i) {
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
QUERY_CHECK_NULL(pUidTagInfo, code, lino, _end, terrno);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
@ -4368,8 +4386,10 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
size_t szTables = taosArrayGetSize(aUidTags);
for (int i = 0; i < szTables; ++i) {
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
QUERY_CHECK_NULL(pUidTagInfo, code, lino, _end, terrno);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
@ -4815,6 +4835,10 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
pInput->pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
SColumnInfoData* col = taosArrayGet(pInput->pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
if (!col) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
pInput->aTs = (int64_t*)col->pData;
}
@ -4829,8 +4853,16 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubTblsInfo->pTsOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
if (!pSubTblsInfo->pTsOrderInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (taosArrayGetSize(pInfo->pSortInfo) == 2) {
pSubTblsInfo->pPkOrderInfo = taosArrayGet(pInfo->pSortInfo, 1);
if (!pSubTblsInfo->pPkOrderInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
} else {
pSubTblsInfo->pPkOrderInfo = NULL;
}
@ -4952,6 +4984,10 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
}
if (pInput->rowIdx != -1) {
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
if (!col) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
pInput->pInputBlock = pInputBlock;
pInput->aTs = (int64_t*)col->pData;
}
@ -4970,8 +5006,10 @@ static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo,
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
QUERY_CHECK_NULL(pSrcColInfo, code, lino, _end, terrno);
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
if (isNull) {
@ -5338,6 +5376,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS
int32_t pkTargetSlotId = -1;
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
QUERY_CHECK_NULL(colInfo, code, lino, _end, terrno);
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tsTargetSlotId = colInfo->dstSlotId;
biTs.order = order;
@ -6010,6 +6049,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
if (pSupp->dbNameSlotId != -1) {
ASSERT(strlen(dbName));
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);
@ -6021,6 +6061,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
if (pSupp->stbNameSlotId != -1) {
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
if (strlen(stbName) != 0) {
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
@ -6034,6 +6075,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
if (pSupp->tbCountSlotId != -1) {
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
code = colDataSetVal(colInfoData, 0, (char*)&count, false);
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -101,6 +101,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
}
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
if (pSortNode->calcGroupId) {
@ -120,7 +121,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
if (TSDB_CODE_SUCCESS == code) {
// PK ts col should always at last, see partColOptCreateSort
if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr);
code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
QUERY_CHECK_CODE(code, lino, _error);
}
if (TSDB_CODE_SUCCESS == code) {
pGroupIdCalc->lastKeysLen = 0;
@ -788,6 +790,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);

View File

@ -899,7 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -595,6 +595,9 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
if (pInfo->pCur->mr.me.stbEntry.schemaRow.pSchema) {
QUERY_CHECK_NULL(schemaWrapper, code, lino, _end, terrno);
}
code = taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
if (code == TSDB_CODE_DUP_KEY) {
code = TSDB_CODE_SUCCESS;
@ -626,6 +629,9 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
return NULL;
}
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
if (smrSuperTable.me.stbEntry.schemaRow.pSchema) {
QUERY_CHECK_NULL(schemaWrapper, code, lino, _end, terrno);
}
code = taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
if (code == TSDB_CODE_DUP_KEY) {
code = TSDB_CODE_SUCCESS;
@ -985,16 +991,19 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
// table name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, tableName, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, stableName, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1002,12 +1011,14 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tagName, (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].name);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, tagName, false);
QUERY_CHECK_CODE(code, lino, _end);
// tag type
int8_t tagType = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].type;
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
if (tagType == TSDB_DATA_TYPE_NCHAR) {
@ -1073,6 +1084,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
}
}
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, tagVarChar,
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
QUERY_CHECK_CODE(code, lino, _end);
@ -1108,15 +1120,18 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
// table name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, tName, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, tableType, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1124,12 +1139,14 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, schemaRow->pSchema[i].name);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, colName, false);
QUERY_CHECK_CODE(code, lino, _end);
// col type
int8_t colType = schemaRow->pSchema[i].type;
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
@ -1144,11 +1161,13 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (const char*)&schemaRow->pSchema[i].bytes, false);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t j = 6; j <= 8; ++j) {
pColInfoData = taosArrayGet(dataBlock->pDataBlock, j);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
}
++numOfRows;
@ -1217,6 +1236,7 @@ int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTabl
}
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
STR_TO_VARSTR(n, pm->name);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
@ -1225,26 +1245,31 @@ int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTabl
// database name
STR_TO_VARSTR(n, dbName);
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pm->colNum, false);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t j = 4; j <= 8; ++j) {
pColInfoData = taosArrayGet(p->pDataBlock, j);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
}
STR_TO_VARSTR(n, "SYSTEM_TABLE");
pColInfoData = taosArrayGet(p->pDataBlock, 9);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1327,6 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
int32_t i = pIdx->lastIdx;
for (; i < taosArrayGetSize(pIdx->uids); i++) {
tb_uid_t* uid = taosArrayGet(pIdx->uids, i);
QUERY_CHECK_NULL(uid, code, lino, _end, terrno);
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
@ -1339,16 +1365,20 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1357,6 +1387,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
// create time
int64_t ts = mr.me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1373,18 +1404,21 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr1.me.stbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
STR_TO_VARSTR(n, mr1.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->metaReaderFn.clearReader(&mr1);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, mr.me.ctbEntry.comment);
@ -1401,11 +1435,13 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ctbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1414,20 +1450,24 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
QUERY_CHECK_CODE(code, lino, _end);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, mr.me.ntbEntry.comment);
@ -1444,11 +1484,13 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ntbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1459,6 +1501,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pAPI->metaReaderFn.clearReader(&mr);
pColInfoData = taosArrayGet(p->pDataBlock, 9);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1561,16 +1604,19 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1579,6 +1625,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
// create time
int64_t ts = pInfo->pCur->mr.me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1603,18 +1650,21 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
STR_TO_VARSTR(n, mr.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->metaReaderFn.clearReader(&mr);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
@ -1631,11 +1681,13 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1643,20 +1695,24 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
QUERY_CHECK_CODE(code, lino, _end);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
@ -1673,11 +1729,13 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1685,6 +1743,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
}
pColInfoData = taosArrayGet(p->pDataBlock, 9);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -1961,6 +2020,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
QUERY_CHECK_NULL(pColumnInfoData, code, lino, _end, terrno);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(varTbName, name);
@ -2617,6 +2677,8 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
@ -2637,6 +2699,7 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
// make the valgrind happy that all memory buffer has been initialized already.
if (slotId != 0) {
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
int64_t v = 0;
colDataSetInt64(p1, 0, &v);
}

View File

@ -47,6 +47,10 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo,
}
SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx);
if (!pKey) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno);
}
int32_t code = doSetVal(pDstColInfo, rowIndex, pKey);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
@ -426,6 +430,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (pFillInfo->type == TSDB_FILL_PREV) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
code = doSetVal(pDst, index, pKey);
QUERY_CHECK_CODE(code, lino, _end);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
@ -440,6 +445,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
code = doSetVal(pDst, index, pKey);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -582,12 +588,12 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
}
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
taosMemoryFree(pKey->pData);
if (pKey) taosMemoryFree(pKey->pData);
}
taosArrayDestroy(pFillInfo->prev.pRowVal);
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
taosMemoryFree(pKey->pData);
if (pKey) taosMemoryFree(pKey->pData);
}
taosArrayDestroy(pFillInfo->next.pRowVal);
@ -732,6 +738,8 @@ int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start;
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo));
if (pFillCol == NULL) {
return NULL;
@ -749,6 +757,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
int32_t index = (i >= len) ? (len - 1) : i;
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
}
}
@ -761,4 +770,14 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
}
return pFillCol;
_end:
for (int32_t i = 0; i < numOfFillExpr; ++i) {
taosVariantDestroy(&pFillCol[i].fillVal);
}
taosMemoryFree(pFillCol);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
}

View File

@ -614,6 +614,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
if (!pTsKey) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
int64_t prevTs = *(int64_t*)pTsKey->pData;
if (groupId == pBlock->info.id.groupId) {
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
@ -846,6 +851,11 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
if (!pColDataInfo) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
tsCols = (int64_t*)pColDataInfo->pData;
ASSERT(tsCols[0] != 0);
@ -924,6 +934,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SExprSupp* pSup = &pOperator->exprSupp;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
if (!pStateColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
int64_t gid = pBlock->info.id.groupId;
bool masterScan = true;
@ -931,6 +945,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
int32_t bytes = pStateColInfoData->info.bytes;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
if (!pColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
@ -1395,6 +1413,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SExprSupp* pSup = &pOperator->exprSupp;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
if (!pColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
}
bool masterScan = true;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;