diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9a21563abe..d8c26025bd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4730,7 +4730,7 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; } } - if (!tDecodeIsEnd(&decoder)){ + if (!tDecodeIsEnd(&decoder)) { if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1; } @@ -5601,17 +5601,17 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { taosMemoryFreeClear(pReq->msg); } -int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { +int32_t tSerializeSOperatorParam(SEncoder *pEncoder, SOperatorParam *pOpParam) { if (tEncodeI32(pEncoder, pOpParam->opType) < 0) return -1; if (tEncodeI32(pEncoder, pOpParam->downstreamIdx) < 0) return -1; switch (pOpParam->opType) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value; + STableScanOperatorParam *pScan = (STableScanOperatorParam *)pOpParam->value; if (tEncodeI8(pEncoder, pScan->tableSeq) < 0) return -1; int32_t uidNum = taosArrayGetSize(pScan->pUidList); if (tEncodeI32(pEncoder, uidNum) < 0) return -1; for (int32_t m = 0; m < uidNum; ++m) { - int64_t* pUid = taosArrayGet(pScan->pUidList, m); + int64_t *pUid = taosArrayGet(pScan->pUidList, m); if (tEncodeI64(pEncoder, *pUid) < 0) return -1; } break; @@ -5619,31 +5619,32 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { default: return TSDB_CODE_INVALID_PARA; } - + int32_t n = taosArrayGetSize(pOpParam->pChildren); if (tEncodeI32(pEncoder, n) < 0) return -1; for (int32_t i = 0; i < n; ++i) { - SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOpParam->pChildren, i); + SOperatorParam *pChild = *(SOperatorParam **)taosArrayGet(pOpParam->pChildren, i); if (tSerializeSOperatorParam(pEncoder, pChild) < 0) return -1; } return 0; } -int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) { +int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam *pOpParam) { if (tDecodeI32(pDecoder, &pOpParam->opType) < 0) return -1; if (tDecodeI32(pDecoder, &pOpParam->downstreamIdx) < 0) return -1; switch (pOpParam->opType) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); + STableScanOperatorParam *pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); if (NULL == pScan) return -1; - if (tDecodeI8(pDecoder, (int8_t*)&pScan->tableSeq) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t *)&pScan->tableSeq) < 0) return -1; int32_t uidNum = 0; int64_t uid = 0; if (tDecodeI32(pDecoder, &uidNum) < 0) return -1; if (uidNum > 0) { pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t)); if (NULL == pScan->pUidList) return -1; + for (int32_t m = 0; m < uidNum; ++m) { if (tDecodeI64(pDecoder, &uid) < 0) return -1; taosArrayPush(pScan->pUidList, &uid); @@ -5660,11 +5661,12 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) int32_t childrenNum = 0; if (tDecodeI32(pDecoder, &childrenNum) < 0) return -1; + if (childrenNum > 0) { pOpParam->pChildren = taosArrayInit(childrenNum, POINTER_BYTES); if (NULL == pOpParam->pChildren) return -1; for (int32_t i = 0; i < childrenNum; ++i) { - SOperatorParam* pChild = taosMemoryCalloc(1, sizeof(SOperatorParam)); + SOperatorParam *pChild = taosMemoryCalloc(1, sizeof(SOperatorParam)); if (NULL == pChild) return -1; if (tDeserializeSOperatorParam(pDecoder, pChild) < 0) return -1; taosArrayPush(pOpParam->pChildren, &pChild); @@ -5676,7 +5678,6 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) return 0; } - int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -5737,7 +5738,7 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) if (NULL == pReq->pOpParam) return -1; if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam) < 0) return -1; } - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -5925,7 +5926,7 @@ int32_t tDeserializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pR if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; - if (tDecodeI32(&decoder, (int32_t*)&pReq->type) < 0) return -1; + if (tDecodeI32(&decoder, (int32_t *)&pReq->type) < 0) return -1; tEndDecode(&decoder); @@ -5933,7 +5934,6 @@ int32_t tDeserializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pR return 0; } - int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5951,7 +5951,7 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp if (tEncodeI32(&encoder, pVer->tversion) < 0) return -1; } } - + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5979,7 +5979,7 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR if (tDecodeI32(&decoder, &tbVer.tversion) < 0) return -1; if (NULL == taosArrayPush(pRsp->tbVerInfo, &tbVer)) return -1; } - + tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 4a1b3961cd..33b055c3b1 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -91,7 +91,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { uint8_t* rowData = NULL; int64_t len; code = streamSnapRead(pReader->pReaderImpl, &rowData, &len); - if (rowData == NULL || len == 0) { + if (code != 0 || rowData == NULL || len == 0) { return code; } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8a80d74c63..eeef1f577c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -178,6 +178,10 @@ void bkdMgtDestroy(SBackendManager* bm) { taosHashCleanup(bm->pSstTbl[0]); taosHashCleanup(bm->pSstTbl[1]); + + taosMemoryFree(bm->pCurrent); + taosMemoryFree(bm->pManifest); + taosMemoryFree(bm); } @@ -239,7 +243,7 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { - char* p = taosStrdup(name); + // char* p = taosStrdup(name); taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } @@ -267,7 +271,7 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { taosArrayClearP(bm->pDel, taosMemoryFree); taosHashClear(bm->pSstTbl[1 - bm->idx]); bm->update = 0; - + taosCloseDir(&pDir); return code; } @@ -280,6 +284,8 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { taosHashClear(bm->pSstTbl[bm->idx]); bm->idx = 1 - bm->idx; + taosCloseDir(&pDir); + return 0; } @@ -287,8 +293,8 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { int32_t code = 0; int32_t len = bm->len + 128; - char* dstBuf = taosMemoryCalloc(1, len); char* srcBuf = taosMemoryCalloc(1, len); + char* dstBuf = taosMemoryCalloc(1, len); char* srcDir = taosMemoryCalloc(1, len); char* dstDir = taosMemoryCalloc(1, len); @@ -297,12 +303,16 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname); if (!taosDirExist(srcDir)) { - return 0; + qError("failed to dump srcDir %s, reason: not exist such dir", srcDir); + code = -1; + goto _ERROR; } code = taosMkDir(dstDir); if (code != 0) { - return code; + terrno = TAOS_SYSTEM_ERROR(errno); + qError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); + goto _ERROR; } // clear current file @@ -353,6 +363,7 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree); +_ERROR: taosMemoryFree(srcBuf); taosMemoryFree(dstBuf); taosMemoryFree(srcDir); @@ -388,7 +399,11 @@ int32_t copyFiles(const char* src, const char* dst) { char* dstName = taosMemoryCalloc(1, dLen + 64); TdDirPtr pDir = taosOpenDir(src); - if (pDir == NULL) return 0; + if (pDir == NULL) { + taosMemoryFree(srcName); + taosMemoryFree(dstName); + return -1; + } TdDirEntryPtr de = NULL; while ((de = taosReadDir(pDir)) != NULL) { @@ -1483,7 +1498,11 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); params[i].tableOpt = tableOpt; - int idx = streamStateGetCfIdx(NULL, funcname); + int idx = streamStateGetCfIdx(NULL, funcname); + if (idx < 0 || idx >= sizeof(ginitDict) / sizeof(ginitDict[0])) { + qError("failed to open cf"); + return -1; + } SCfInit* cfPara = &ginitDict[idx]; rocksdb_comparator_t* compare = @@ -2315,6 +2334,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* char* val = NULL; int32_t len = decodeValueFunc((void*)vval, vLen, NULL, &val); if (len < 0) { + taosMemoryFree(val); return -1; } @@ -2798,6 +2818,7 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); *len = decodeValueFunc((void*)val, vlen, NULL, &ret); if (*len < 0) { + taosMemoryFree(ret); return NULL; } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 8a4500dd86..feb127e313 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -343,6 +343,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { + taosMemoryFree(buf); code = TAOS_SYSTEM_ERROR(terrno); qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, item->type, tstrerror(code));