fix coverity san issues
This commit is contained in:
parent
c8cc0d2feb
commit
71f4226e15
|
@ -337,6 +337,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||||
if (!tagArray) {
|
if (!tagArray) {
|
||||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
|
taosMemoryFreeClear(pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
STagVal tagVal = {
|
STagVal tagVal = {
|
||||||
|
@ -352,6 +353,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
tagArray = taosArrayDestroy(tagArray);
|
tagArray = taosArrayDestroy(tagArray);
|
||||||
if (pTag == NULL) {
|
if (pTag == NULL) {
|
||||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
|
taosMemoryFreeClear(pCreateTbReq);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,6 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
|
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
|
||||||
taosFreeQitem(trigger);
|
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||||
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
|
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
|
||||||
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
|
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
|
||||||
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
streamStateClearBatch(batch);
|
streamStateClearBatch(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,7 +364,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
|
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
|
||||||
}
|
}
|
||||||
if (streamStateGetBatchSize(batch) > 0) {
|
if (streamStateGetBatchSize(batch) > 0) {
|
||||||
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
}
|
}
|
||||||
streamStateClearBatch(batch);
|
streamStateClearBatch(batch);
|
||||||
|
|
||||||
|
@ -376,7 +376,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
||||||
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
||||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
|
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
|
||||||
taosMemoryFree(valBuf);
|
taosMemoryFree(valBuf);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
@ -480,7 +480,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
memcpy(pNewPos->pRowBuff, pVal, pVLen);
|
memcpy(pNewPos->pRowBuff, pVal, pVLen);
|
||||||
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->rowSize, &pNewPos, POINTER_BYTES);
|
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue