diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 29fb18ef07..52d3920552 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1096,21 +1096,23 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { * replication is finished */ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { + int32_t code = 0; STaskDbWrapper* pBackend = arg; - + SArray * chkpDel = NULL, *chkpDup = NULL; (void)taosThreadRwlockWrlock(&pBackend->chkpDirLock); - (void)taosArrayPush(pBackend->chkpSaved, &chkpId); - - SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); - if (chkpDel == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } - SArray* chkpDup = taosArrayInit(8, sizeof(int64_t)); + chkpDel = taosArrayInit(8, sizeof(int64_t)); + if (chkpDel == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } + + chkpDup = taosArrayInit(8, sizeof(int64_t)); if (chkpDup == NULL) { - taosArrayDestroy(chkpDel); - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } int64_t firsId = 0; @@ -1120,9 +1122,13 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); if (id >= firsId) { - (void)taosArrayPush(chkpDup, &id); + if (taosArrayPush(chkpDup, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } else { - (void)taosArrayPush(chkpDel, &id); + if (taosArrayPush(chkpDel, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } } else { @@ -1131,13 +1137,18 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < dsz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - (void)taosArrayPush(chkpDel, &id); + if (taosArrayPush(chkpDel, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - (void)taosArrayPush(chkpDup, &id); + if (taosArrayPush(chkpDup, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } + taosArrayDestroy(pBackend->chkpSaved); pBackend->chkpSaved = chkpDup; @@ -1155,6 +1166,11 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { } taosArrayDestroy(chkpDel); return 0; +_exception: + taosArrayDestroy(chkpDup); + taosArrayDestroy(chkpDel); + taosThreadRwlockUnlock(&pBackend->chkpDirLock); + return code; } #ifdef BUILD_NO_CALL @@ -1288,7 +1304,9 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); if (ret == 1) { - (void)taosArrayPush(pBackend->chkpSaved, &checkpointId); + if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } else { continue; @@ -1300,13 +1318,21 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { (void)taosCloseDir(&pDir); return 0; +_exception: + taosMemoryFree(pChkpDir); + (void)taosCloseDir(&pDir); + return code; } int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { + int32_t code = 0; SArray* pHandle = taosArrayInit(8, POINTER_BYTES); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (pBackend->pCf[i]) { rocksdb_column_family_handle_t* p = pBackend->pCf[i]; - (void)taosArrayPush(pHandle, &p); + if (taosArrayPush(pHandle, &p) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } } } int32_t nCf = taosArrayGetSize(pHandle); @@ -1316,13 +1342,20 @@ int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_ha } rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + if (ppCf == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } for (int i = 0; i < nCf; i++) { ppCf[i] = taosArrayGetP(pHandle, i); } + taosArrayDestroy(pHandle); *ppHandle = ppCf; return nCf; +_exception: + taosArrayDestroy(pHandle); + return code; } int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { @@ -2435,7 +2468,9 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { (void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); - (void)taosArrayPush(pTaskDb->chkpInUse, &chkp); + if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) { + stError("failed to push chkp: %" PRIi64 " into inuse", chkp); + } taosArraySort(pTaskDb->chkpInUse, chkpIdComp); (void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); } @@ -4271,7 +4306,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { int64_t checkPoint = 0; if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - (void)taosArrayPush(result, &checkPoint); + if (taosArrayPush(result, &checkPoint) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } } else { break; @@ -4487,7 +4525,10 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { return TSDB_CODE_OUT_OF_MEMORY; } (void)strncpy(fname, name, len); - (void)taosArrayPush(diff, &fname); + if (taosArrayPush(diff, &fname) == NULL) { + taosMemoryFree(fname); + return TSDB_CODE_OUT_OF_MEMORY; + } } pIter = taosHashIterate(p2, pIter); } @@ -4646,7 +4687,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { } (void)strncpy(fname, name, len); - (void)taosArrayPush(p->pAdd, &fname); + if (taosArrayPush(p->pAdd, &fname) == NULL) { + taosMemoryFree(fname); + (void)taosThreadRwlockUnlock(&p->rwLock); + return TSDB_CODE_OUT_OF_MEMORY; + } } pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } @@ -4850,7 +4895,11 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } - (void)taosArrayPush(list, &p); + if (taosArrayPush(list, &p) == NULL) { + taosMemoryFree(p); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } } // copy current file to dst dir