fix(stream): fix coverity scan issues.
This commit is contained in:
parent
a5b93aaf97
commit
f602aa965f
|
@ -1922,6 +1922,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) {
|
||||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||||
if (pCommitRaw == NULL) {
|
if (pCommitRaw == NULL) {
|
||||||
mError("failed to encode stream since %s", terrstr());
|
mError("failed to encode stream since %s", terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1988,6 +1989,7 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1998,7 +2000,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
int32_t code = mndPersistTransLog(pStream, pTrans);
|
int32_t code = mndPersistTransLog(pStream, pTrans);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -437,6 +437,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
if (*pRefBlock == NULL) {
|
if (*pRefBlock == NULL) {
|
||||||
|
blockDataCleanup(pDelBlock);
|
||||||
|
taosMemoryFree(pDelBlock);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -273,7 +273,12 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
|
||||||
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
|
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
|
||||||
|
|
||||||
tBrinBlockClear(&pIter->block);
|
tBrinBlockClear(&pIter->block);
|
||||||
tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pIter->recordIndex = -1;
|
pIter->recordIndex = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7007,7 +7007,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
|
|
||||||
int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col);
|
int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col);
|
||||||
if (code) {
|
if (code) {
|
||||||
nodesDestroyNode((SNode*)col);
|
|
||||||
nodesDestroyList(pParamterList);
|
nodesDestroyList(pParamterList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -7025,7 +7024,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
}
|
}
|
||||||
code = nodesListStrictAppend(pProjectionList, pFunc);
|
code = nodesListStrictAppend(pProjectionList, pFunc);
|
||||||
if (code) {
|
if (code) {
|
||||||
nodesDestroyNode(pFunc);
|
|
||||||
nodesDestroyList(pProjectionList);
|
nodesDestroyList(pProjectionList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,6 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
||||||
taosFreeQitem(pTrigger);
|
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,7 +125,6 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
||||||
taosFreeQitem(pChkpoint);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,7 +270,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
keys[0] = pId->streamId;
|
keys[0] = pId->streamId;
|
||||||
keys[1] = pId->taskId;
|
keys[1] = pId->taskId;
|
||||||
|
|
||||||
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
if (ppTask == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask* p = *ppTask;
|
||||||
if (p->info.fillHistory == 1) {
|
if (p->info.fillHistory == 1) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -418,6 +418,10 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
|
||||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
int64_t keys[2] = {pId->streamId, pId->taskId};
|
||||||
|
|
||||||
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
if (p == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if ((*p)->info.fillHistory == 0) {
|
if ((*p)->info.fillHistory == 0) {
|
||||||
num += 1;
|
num += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -312,12 +312,20 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
}
|
}
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
type == STREAM_INPUT__TRANS_STATE) {
|
type == STREAM_INPUT__TRANS_STATE) {
|
||||||
taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosFreeQitem(pItem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
// use the default memory limit, refactor later.
|
// use the default memory limit, refactor later.
|
||||||
taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosFreeQitem(pItem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
Loading…
Reference in New Issue