Merge pull request #27003 from taosdata/fix/syntax
fix(query):check return code
This commit is contained in:
commit
1bb5d787ba
|
@ -1261,13 +1261,16 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
||||
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
|
||||
*ppCols = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t rows = pDataBlock->info.capacity;
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
|
||||
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
|
||||
if (pCols == NULL) {
|
||||
return NULL;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -1280,8 +1283,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
|||
if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
|
||||
pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
|
||||
pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
|
||||
if (pCols[i].varmeta.offset == NULL) {
|
||||
return NULL;
|
||||
if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
|
||||
code = terrno;
|
||||
taosMemoryFree(pCols[i].varmeta.offset);
|
||||
taosMemoryFree(pCols[i].pData);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pCols[i].varmeta.length = pColInfoData->varmeta.length;
|
||||
|
@ -1290,12 +1296,20 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
|||
pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
|
||||
pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
|
||||
if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
|
||||
return NULL;
|
||||
code = terrno;
|
||||
taosMemoryFree(pCols[i].nullbitmap);
|
||||
taosMemoryFree(pCols[i].pData);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pCols;
|
||||
*ppCols = pCols;
|
||||
return code;
|
||||
|
||||
_error:
|
||||
taosMemoryFree(pCols);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
|
||||
|
@ -1423,16 +1437,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
|
||||
int64_t p1 = taosGetTimestampUs();
|
||||
|
||||
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
|
||||
if (pCols == NULL) {
|
||||
SColumnInfoData* pCols = NULL;
|
||||
int32_t code = createHelpColInfoData(pDataBlock, &pCols);
|
||||
if (code != 0) {
|
||||
destroyTupleIndex(index);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t p2 = taosGetTimestampUs();
|
||||
|
||||
int32_t code = blockDataAssign(pCols, pDataBlock, index);
|
||||
code = blockDataAssign(pCols, pDataBlock, index);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -349,6 +349,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
streamMetaWLock(pStreamMeta);
|
||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||
streamMetaWUnLock(pStreamMeta);
|
||||
if (pTaskList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||
|
||||
|
|
|
@ -3414,6 +3414,10 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
|
|||
}
|
||||
|
||||
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
|
||||
if (pBlockScanInfo == NULL || *pBlockScanInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pReader->pIgnoreTables &&
|
||||
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
|
||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||
|
|
|
@ -989,6 +989,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
|
|||
dataInfo.taskId = pExchangeInfo->pTaskId;
|
||||
dataInfo.index = pIdx->srcIdx;
|
||||
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
||||
if (dataInfo.pSrcUidList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
dataInfo.srcOpType = pBasicParam->srcOpType;
|
||||
dataInfo.tableSeq = pBasicParam->tableSeq;
|
||||
|
||||
|
@ -1007,6 +1011,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
|
|||
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
|
||||
}
|
||||
pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
||||
if (pDataInfo->pSrcUidList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pDataInfo->srcOpType = pBasicParam->srcOpType;
|
||||
pDataInfo->tableSeq = pBasicParam->tableSeq;
|
||||
}
|
||||
|
|
|
@ -826,6 +826,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
|
||||
if (tsTagFilterCache) {
|
||||
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
|
||||
QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
|
||||
|
||||
code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
|
||||
tListLen(context.digest), tableList,
|
||||
taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
|
||||
|
|
|
@ -1520,7 +1520,7 @@ _end:
|
|||
return pUidList;
|
||||
}
|
||||
|
||||
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
||||
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -1528,23 +1528,25 @@ static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
|||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||
|
||||
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
|
||||
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
} else {
|
||||
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
|
||||
extractTableList(pList, pOperator->pDownstream[0]);
|
||||
code = extractTableList(pList, pOperator->pDownstream[0]);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
|
||||
|
@ -1552,16 +1554,17 @@ int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
*pList = NULL;
|
||||
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
|
||||
if (pArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
extractTableList(pArray, pOperator);
|
||||
|
||||
int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
|
||||
if (code == 0) {
|
||||
*pList = pArray;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
|
||||
|
|
|
@ -285,6 +285,10 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
|
|||
pSortHandle->pageSize = pageSize;
|
||||
pSortHandle->numOfPages = numOfPages;
|
||||
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
|
||||
if (pSortHandle->pSortInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pSortHandle->loops = 0;
|
||||
|
||||
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
|
||||
|
@ -1708,6 +1712,9 @@ int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
|
|||
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
|
||||
pHandle->extRowsMemSize = extRowsMemSize;
|
||||
pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
|
||||
if (pHandle->aExtRowsOrders == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = initRowIdSort(pHandle);
|
||||
if (code) {
|
||||
|
|
|
@ -819,10 +819,21 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
|||
|
||||
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
||||
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
||||
if (pHandle == NULL) {
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
pHandle->list = tdListNew(sizeof(SCfComparator));
|
||||
if (pHandle->list == NULL) {
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
(void)taosThreadMutexInit(&pHandle->mutex, NULL);
|
||||
(void)taosThreadMutexInit(&pHandle->cfMutex, NULL);
|
||||
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (pHandle->cfInst == NULL) {
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
|
||||
|
||||
|
|
|
@ -1277,10 +1277,13 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
|
|||
if (pMeta->closeFlag) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
|
||||
return -1;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
*pList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
if (*pList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||
|
|
|
@ -367,6 +367,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
|||
pPBuf->fileSize = 0;
|
||||
pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
|
||||
pPBuf->freePgList = tdListNew(POINTER_BYTES);
|
||||
if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
// at least more than 2 pages must be in memory
|
||||
if (inMemBufSize < pagesize * 2) {
|
||||
|
|
Loading…
Reference in New Issue