fix: validate function return value

This commit is contained in:
dapan1121 2024-07-12 15:22:39 +08:00
parent 548613b9ae
commit e527f2fd0b
24 changed files with 184 additions and 99 deletions

View File

@ -205,7 +205,7 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
void taosArrayClearP(SArray* pArray, void (*fp)(void*)); void taosArrayClearP(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray); void taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp); void taosArrayDestroyP(SArray* pArray, FDelete fp);

View File

@ -1398,7 +1398,8 @@ void hbMgrCleanUp() {
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
appHbMgrCleanup(); appHbMgrCleanup();
clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = NULL;
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
} }

View File

@ -1512,7 +1512,9 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
colDataDestroy(pColInfoData); colDataDestroy(pColInfoData);
} }
pBlock->pDataBlock = taosArrayDestroy(pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock);
pBlock->pDataBlock = NULL;
taosMemoryFreeClear(pBlock->pBlockAgg); taosMemoryFreeClear(pBlock->pBlockAgg);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
} }

View File

@ -9505,7 +9505,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) {
static void tDeleteMqDataRspCommon(void *rsp) { static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp; SMqDataRspCommon *pRsp = rsp;
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL; pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
@ -9558,7 +9559,8 @@ void tDeleteSTaosxRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp); tDeleteMqDataRspCommon(rsp);
STaosxRsp *pRsp = (STaosxRsp *)rsp; STaosxRsp *pRsp = (STaosxRsp *)rsp;
pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL; pRsp->createTableReq = NULL;
} }

View File

@ -195,7 +195,9 @@ void *freeStreamTasks(SArray *pTaskLevel) {
taosArrayDestroy(pLevel); taosArrayDestroy(pLevel);
} }
return taosArrayDestroy(pTaskLevel); taosArrayDestroy(pTaskLevel);
return NULL;
} }
void tFreeStreamObj(SStreamObj *pStream) { void tFreeStreamObj(SStreamObj *pStream) {

View File

@ -978,7 +978,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
} }
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList); taosArrayDestroy(pInfo->pTaskList);
pInfo->pTaskList = NULL;
} }
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {

View File

@ -249,7 +249,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
} }
tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
tagArray = taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
tagArray = NULL;
if (pCreateTbReq->ctb.pTag == NULL) { if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -505,7 +506,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
if (pTableData->aRowP == NULL || pVals == NULL) { if (pTableData->aRowP == NULL || pVals == NULL) {
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = NULL;
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code)); tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
@ -530,7 +532,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
if (ts < earlyTs) { if (ts < earlyTs) {
tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id, tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
ts, earlyTs); ts, earlyTs);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = NULL;
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -54,19 +54,23 @@ static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
if (pRsp->blockData != NULL) { if (pRsp->blockData != NULL) {
pRsp->blockData = taosArrayDestroy(pRsp->blockData); taosArrayDestroy(pRsp->blockData);
pRsp->blockData = NULL;
} }
if (pRsp->blockDataLen != NULL) { if (pRsp->blockDataLen != NULL) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
} }
if (pRsp->blockTbName != NULL) { if (pRsp->blockTbName != NULL) {
pRsp->blockTbName = taosArrayDestroy(pRsp->blockTbName); taosArrayDestroy(pRsp->blockTbName);
pRsp->blockTbName = NULL;
} }
if (pRsp->blockSchema != NULL) { if (pRsp->blockSchema != NULL) {
pRsp->blockSchema = taosArrayDestroy(pRsp->blockSchema); taosArrayDestroy(pRsp->blockSchema);
pRsp->blockSchema = NULL;
} }
return -1; return -1;
} }

View File

@ -3187,7 +3187,8 @@ static void clearLastFileSet(SFSNextRowIter *state) {
int32_t iter = 0; int32_t iter = 0;
while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) { while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
STableLoadInfo *pInfo = *(STableLoadInfo **)pe; STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
pInfo->pTombData = taosArrayDestroy(pInfo->pTombData); taosArrayDestroy(pInfo->pTombData);
pInfo->pTombData = NULL;
} }
} }
} }

View File

@ -311,7 +311,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
int32_t iter = 0; int32_t iter = 0;
while ((pe = tSimpleHashIterate(p->pTableMap, pe, &iter)) != NULL) { while ((pe = tSimpleHashIterate(p->pTableMap, pe, &iter)) != NULL) {
STableLoadInfo* pInfo = *(STableLoadInfo**)pe; STableLoadInfo* pInfo = *(STableLoadInfo**)pe;
pInfo->pTombData = taosArrayDestroy(pInfo->pTombData); taosArrayDestroy(pInfo->pTombData);
pInfo->pTombData = NULL;
} }
tSimpleHashCleanup(p->pTableMap); tSimpleHashCleanup(p->pTableMap);

View File

@ -308,7 +308,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
} }
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); taosArrayDestroy(pInfo->delSkyline);
pInfo->delSkyline = NULL;
pInfo->lastProcKey.ts = ts; pInfo->lastProcKey.ts = ts;
// todo check the nextProcKey info // todo check the nextProcKey info
pInfo->sttKeyInfo.nextProcKey.ts = ts + step; pInfo->sttKeyInfo.nextProcKey.ts = ts + step;
@ -329,11 +330,16 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
} }
p->delSkyline = taosArrayDestroy(p->delSkyline); taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList); p->delSkyline = NULL;
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); taosArrayDestroy(p->pBlockList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pBlockList = NULL;
p->pFileDelData = taosArrayDestroy(p->pFileDelData); taosArrayDestroy(p->pBlockIdxList);
p->pBlockIdxList = NULL;
taosArrayDestroy(p->pMemDelData);
p->pMemDelData = NULL;
taosArrayDestroy(p->pFileDelData);
p->pFileDelData = NULL;
clearRowKey(&p->lastProcKey); clearRowKey(&p->lastProcKey);
clearRowKey(&p->sttRange.skey); clearRowKey(&p->sttRange.skey);
@ -579,7 +585,8 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
} }
taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); taosArrayDestroy(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = NULL;
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
@ -624,7 +631,8 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i); STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); taosArrayDestroy(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = NULL;
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();

View File

@ -96,7 +96,8 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->freeItem = false; pGroupResInfo->freeItem = false;
pGroupResInfo->pRows = NULL; pGroupResInfo->pRows = NULL;
} else { } else {
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
} }
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }
@ -2102,7 +2103,8 @@ void* tableListDestroy(STableListInfo* pTableListInfo) {
return NULL; return NULL;
} }
pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); taosArrayDestroy(pTableListInfo->pTableList);
pTableListInfo->pTableList = NULL;
taosMemoryFreeClear(pTableListInfo->groupOffset); taosMemoryFreeClear(pTableListInfo->groupOffset);
taosHashCleanup(pTableListInfo->map); taosHashCleanup(pTableListInfo->map);

View File

@ -129,7 +129,8 @@ static void destroyStreamFillOperatorInfo(void* param) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->matchInfo.pList = NULL;
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }

View File

@ -406,7 +406,8 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
} }
pGroupResInfo->freeItem = false; pGroupResInfo->freeItem = false;
} }
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }

View File

@ -1136,7 +1136,8 @@ void destroyIntervalOperatorInfo(void* param) {
tdListFree(pInfo->binfo.resultRowInfo.openWindow); tdListFree(pInfo->binfo.resultRowInfo.openWindow);
pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols); taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL;
taosArrayDestroyEx(pInfo->pPrevValues, freeItem); taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL; pInfo->pPrevValues = NULL;

View File

@ -391,7 +391,7 @@ void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask);
int32_t qwDropTask(QW_FPARAMS_DEF); int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
int32_t qwOpenRef(void); int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam); void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);

View File

@ -495,9 +495,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); code = qwProcessCQuery(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); QW_SCH_TASK_DLOG("processCQuery end, node:%p, code:0x%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -503,14 +503,16 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
*pParam = &gQwMgmt.param[paramIdx]; *pParam = &gQwMgmt.param[paramIdx];
} }
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
STbVerInfo tbInfo; STbVerInfo tbInfo;
int32_t i = 0; int32_t i = 0;
int32_t code = TSDB_CODE_SUCCESS;
while (true) { while (true) {
if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) { code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i);
if (TSDB_CODE_SUCCESS != code) {
break; break;
} }
@ -522,12 +524,19 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
if (NULL == ctx->tbInfo) { if (NULL == ctx->tbInfo) {
ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo)); ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo));
if (NULL == ctx->tbInfo) {
QW_ERR_RET(terrno);
}
} }
taosArrayPush(ctx->tbInfo, &tbInfo); if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) {
QW_ERR_RET(terrno);
}
i++; i++;
} }
QW_RET(code);
} }
void qwCloseRef(void) { void qwCloseRef(void) {

View File

@ -18,10 +18,11 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwStopAllTasks(SQWorker *mgmt) { void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId; uint64_t qId, tId, sId;
int32_t eId; int32_t eId;
int64_t rId = 0; int64_t rId = 0;
int32_t code = TSDB_CODE_SUCCESS;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
@ -44,22 +45,29 @@ int32_t qwStopAllTasks(SQWorker *mgmt) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
QW_TASK_DLOG_E("task running, async killed"); if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task running, async kill failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task running, async killed");
}
} else if (QW_FETCH_RUNNING(ctx)) { } else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received"); QW_TASK_DLOG_E("task fetching, update drop received");
} else { } else {
qwDropTask(QW_FPARAMS()); code = qwDropTask(QW_FPARAMS());
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
}
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter); pIter = taosHashIterate(mgmt->ctxHash, pIter);
} }
return TSDB_CODE_SUCCESS;
} }
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
@ -111,7 +119,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) { if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) {
if (!ctx->localExec) { if (!ctx->localExec) {
qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx); QW_ERR_RET(qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
} }
@ -140,6 +148,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
SArray *pResList = taosArrayInit(4, POINTER_BYTES); SArray *pResList = taosArrayInit(4, POINTER_BYTES);
if (NULL == pResList) {
QW_ERR_RET(terrno);
}
while (true) { while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
@ -165,6 +177,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
size_t numOfResBlock = taosArrayGetSize(pResList); size_t numOfResBlock = taosArrayGetSize(pResList);
for (int32_t j = 0; j < numOfResBlock; ++j) { for (int32_t j = 0; j < numOfResBlock; ++j) {
SSDataBlock *pRes = taosArrayGetP(pResList, j); SSDataBlock *pRes = taosArrayGetP(pResList, j);
if (NULL == pRes) {
QW_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SInputData inputData = {.pData = pRes}; SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
@ -226,7 +241,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
_return: _return:
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
} }
@ -241,6 +258,7 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) {
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0; int32_t taskNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
hbInfo->connInfo = sch->hbConnInfo; hbInfo->connInfo = sch->hbConnInfo;
hbInfo->rsp.epId = sch->hbEpId; hbInfo->rsp.epId = sch->hbEpId;
@ -272,7 +290,11 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
status.status = taskStatus->status; status.status = taskStatus->status;
status.refId = taskStatus->refId; status.refId = taskStatus->refId;
taosArrayPush(hbInfo->rsp.taskStatus, &status); if (NULL == taosArrayPush(hbInfo->rsp.taskStatus, &status)) {
taosHashCancelIterate(sch->tasksHash, pIter);
code = terrno;
break;
}
++i; ++i;
pIter = taosHashIterate(sch->tasksHash, pIter); pIter = taosHashIterate(sch->tasksHash, pIter);
@ -280,7 +302,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
QW_UNLOCK(QW_READ, &sch->tasksLock); QW_UNLOCK(QW_READ, &sch->tasksLock);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg, int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg,
@ -320,7 +342,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfRows); pOutput->numOfRows);
if (!ctx->dynamicTask) { if (!ctx->dynamicTask) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
} }
if (NULL == pRsp) { if (NULL == pRsp) {
@ -375,7 +397,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
break; break;
} }
@ -464,7 +486,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
@ -650,7 +672,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return: _return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask);
ctx->queryGotData = true; ctx->queryGotData = true;
} }
@ -660,7 +682,7 @@ _return:
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) { if (!rsped) {
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); code = qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
} }
} }
@ -672,7 +694,7 @@ _return:
} }
if (code) { if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -687,11 +709,11 @@ _return:
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTask(QW_FPARAMS())); QW_ERR_RET(qwDropTask(QW_FPARAMS()));
QW_RET(TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS;
} }
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
@ -706,7 +728,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); QW_ERR_JRET(qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true));
_return: _return:
@ -715,7 +737,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
QW_RET(TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS;
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
@ -761,7 +783,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
qwSaveTbVersionInfo(pTaskInfo, ctx); QW_ERR_JRET(qwSaveTbVersionInfo(pTaskInfo, ctx));
if (!ctx->dynamicTask) { if (!ctx->dynamicTask) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
@ -778,7 +800,7 @@ _return:
input.msgType = qwMsg->msgType; input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
@ -829,7 +851,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
@ -851,9 +873,14 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), if (TSDB_CODE_SUCCESS != code) {
0); QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0);
} else {
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0);
}
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
@ -869,7 +896,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} while (true); } while (true);
input.code = code; input.code = code;
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL); QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
@ -922,7 +949,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} else if (QW_QUERY_RUNNING(ctx)) { } else if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask));
atomic_store_8((int8_t *)&ctx->queryInQueue, 1); atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
@ -952,9 +979,14 @@ _return:
} }
if (!rsped) { if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), if (TSDB_CODE_SUCCESS != code) {
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else {
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
}
} else { } else {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
@ -985,7 +1017,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask));
} else { } else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropped = true; dropped = true;
@ -1001,7 +1033,7 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling
} else { } else {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
} }
@ -1035,7 +1067,7 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
} }
switch (qwMsg->msgType) { switch (qwMsg->msgType) {
@ -1055,7 +1087,7 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling
} }
} }
@ -1104,7 +1136,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
@ -1125,7 +1157,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
int64_t refId = hbParam->refId; int64_t refId = hbParam->refId;
SQWorker *mgmt = qwAcquire(refId); SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) { if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId); QW_DLOG("qwAcquire %" PRIx64 "failed, code:0x%x", refId, terrno);
return; return;
} }
@ -1137,7 +1169,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo(mgmt); qwDbgDumpMgmtInfo(mgmt);
if (gQWDebug.forceStop) { if (gQWDebug.forceStop) {
(void)qwStopAllTasks(mgmt); qwStopAllTasks(mgmt);
} }
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
@ -1145,8 +1177,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
int32_t schNum = taosHashGetSize(mgmt->schHash); int32_t schNum = taosHashGetSize(mgmt->schHash);
if (schNum <= 0) { if (schNum <= 0) {
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
return; return;
} }
@ -1156,9 +1188,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
taosMemoryFree(rspList); taosMemoryFree(rspList);
taosArrayDestroy(pExpiredSch); taosArrayDestroy(pExpiredSch);
QW_ELOG("calloc %d SQWHbInfo failed", schNum); QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
return; return;
} }
@ -1174,7 +1206,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch1->tasksHash) <= 0) { taosHashGetSize(sch1->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId); if (NULL == taosArrayPush(pExpiredSch, sId)) {
QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno);
taosHashCancelIterate(mgmt->schHash, pIter);
break;
}
} }
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
@ -1196,7 +1232,7 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
/*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
@ -1209,8 +1245,8 @@ _return:
taosMemoryFreeClear(rspList); taosMemoryFreeClear(rspList);
taosArrayDestroy(pExpiredSch); taosArrayDestroy(pExpiredSch);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
} }
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
@ -1333,7 +1369,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
_return: _return:
if (mgmt->refId >= 0) { if (mgmt->refId >= 0) {
qwRelease(mgmt->refId); qwRelease(mgmt->refId); // ignore error
} else { } else {
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
@ -1353,7 +1389,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
atomic_store_8(&mgmt->nodeStopped, 1); atomic_store_8(&mgmt->nodeStopped, 1);
(void)qwStopAllTasks(mgmt); qwStopAllTasks(mgmt);
} }
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
@ -1383,7 +1419,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SDataSinkStat sinkStat = {0}; SDataSinkStat sinkStat = {0};
dsDataSinkGetCacheSize(&sinkStat); QW_ERR_RET(dsDataSinkGetCacheSize(&sinkStat));
pStat->cacheDataSize = sinkStat.cachedSize; pStat->cacheDataSize = sinkStat.cachedSize;
pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed); pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
@ -1427,6 +1463,10 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
ctx->explainRes = explainRes; ctx->explainRes = explainRes;
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
if (NULL == rHandle.pMsgCb) {
QW_ERR_JRET(terrno);
}
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);

View File

@ -308,7 +308,9 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
ASSERT(pInfo->inCheckProcess == 0); ASSERT(pInfo->inCheckProcess == 0);
pInfo->pList = taosArrayDestroy(pInfo->pList); taosArrayDestroy(pInfo->pList);
pInfo->pList = NULL;
if (pInfo->checkRspTmr != NULL) { if (pInfo->checkRspTmr != NULL) {
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr); /*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL; pInfo->checkRspTmr = NULL;

View File

@ -438,11 +438,13 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
} }
if (pMsg->pUpdateNodes != NULL) { if (pMsg->pUpdateNodes != NULL) {
pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes); taosArrayDestroy(pMsg->pUpdateNodes);
pMsg->pUpdateNodes = NULL;
} }
if (pMsg->pTaskStatus != NULL) { if (pMsg->pTaskStatus != NULL) {
pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus); taosArrayDestroy(pMsg->pTaskStatus);
pMsg->pTaskStatus = NULL;
} }
pMsg->msgId = -1; pMsg->msgId = -1;

View File

@ -280,10 +280,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask->outputInfo.pTokenBucket); taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo); taosArrayDestroy(pTask->msgInfo.pSendInfo);
pTask->msgInfo.pSendInfo = NULL;
taosThreadMutexDestroy(&pTask->msgInfo.lock); taosThreadMutexDestroy(&pTask->msgInfo.lock);
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
pTask->outputInfo.pNodeEpsetUpdateList = NULL;
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
@ -1055,9 +1057,12 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
} }
taosThreadMutexDestroy(&pInfo->lock); taosThreadMutexDestroy(&pInfo->lock);
pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList); taosArrayDestroy(pInfo->pDispatchTriggerList);
pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList); pInfo->pDispatchTriggerList = NULL;
pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList); taosArrayDestroy(pInfo->pReadyMsgList);
pInfo->pReadyMsgList = NULL;
taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
pInfo->pCheckpointReadyRecvList = NULL;
if (pInfo->pChkptTriggerTmr != NULL) { if (pInfo->pChkptTriggerTmr != NULL) {
taosTmrStop(pInfo->pChkptTriggerTmr); taosTmrStop(pInfo->pChkptTriggerTmr);

View File

@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
SArray* pArray = taosMemoryMalloc(sizeof(SArray)); SArray* pArray = taosMemoryMalloc(sizeof(SArray));
if (pArray == NULL) { if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pArray->size = 0; pArray->size = 0;
pArray->pData = taosMemoryCalloc(size, elemSize); pArray->pData = taosMemoryCalloc(size, elemSize);
if (pArray->pData == NULL) { if (pArray->pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pArray); taosMemoryFree(pArray);
return NULL; return NULL;
} }
@ -387,13 +385,11 @@ void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
taosArrayClear(pArray); taosArrayClear(pArray);
} }
void* taosArrayDestroy(SArray* pArray) { void taosArrayDestroy(SArray* pArray) {
if (pArray) { if (pArray) {
taosMemoryFree(pArray->pData); taosMemoryFree(pArray->pData);
taosMemoryFree(pArray); taosMemoryFree(pArray);
} }
return NULL;
} }
void taosArrayDestroyP(SArray* pArray, FDelete fp) { void taosArrayDestroyP(SArray* pArray, FDelete fp) {

View File

@ -96,7 +96,8 @@ void cfgItemFreeVal(SConfigItem *pItem) {
} }
if (pItem->array) { if (pItem->array) {
pItem->array = taosArrayDestroy(pItem->array); taosArrayDestroy(pItem->array);
pItem->array = NULL;
} }
} }