refactor: check return value.

This commit is contained in:
Haojun Liao 2024-09-18 15:47:15 +08:00
parent b4277e0e65
commit 586826c41f
6 changed files with 68 additions and 26 deletions

View File

@ -2195,8 +2195,9 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
epsetAssign(&entry.epset, &pTask->info.epSet); epsetAssign(&entry.epset, &pTask->info.epSet);
if (taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)) != 0) { int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
mError("failed to put entry into hash map, nodeId:%d", entry.nodeId); if (ret != 0 && ret != TSDB_CODE_DUP_KEY) {
mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code));
} }
} }

View File

@ -135,7 +135,7 @@ typedef struct {
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" #define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
bool streamBackendDataIsExist(const char* path, int64_t chkpId); bool streamBackendDataIsExist(const char* path, int64_t chkpId);
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); int32_t streamBackendInit(const char* path, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg); void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendLoadCheckpointInfo(void* pMeta);

View File

@ -812,29 +812,32 @@ bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
return exist; return exist;
} }
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) {
char* backendPath = NULL; char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); int32_t code = 0;
int32_t lino = 0;
char* err = NULL;
size_t nCf = 0;
*pBackend = NULL;
code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
TSDB_CHECK_CODE(code, lino, _EXIT);
stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId); stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId);
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
if (pHandle == NULL) { TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno);
goto _EXIT;
}
pHandle->list = tdListNew(sizeof(SCfComparator)); pHandle->list = tdListNew(sizeof(SCfComparator));
if (pHandle->list == NULL) { TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno);
goto _EXIT;
}
(void)taosThreadMutexInit(&pHandle->mutex, NULL); (void)taosThreadMutexInit(&pHandle->mutex, NULL);
(void)taosThreadMutexInit(&pHandle->cfMutex, NULL); (void)taosThreadMutexInit(&pHandle->cfMutex, NULL);
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pHandle->cfInst == NULL) { TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
goto _EXIT;
}
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
@ -863,9 +866,6 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory); rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory);
char* err = NULL;
size_t nCf = 0;
char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err); char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) { if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
@ -894,7 +894,9 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
taosMemoryFreeClear(backendPath); taosMemoryFreeClear(backendPath);
return (void*)pHandle; *pBackend = pHandle;
return code;
_EXIT: _EXIT:
rocksdb_options_destroy(opts); rocksdb_options_destroy(opts);
rocksdb_cache_destroy(cache); rocksdb_cache_destroy(cache);
@ -904,9 +906,9 @@ _EXIT:
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
pHandle->list = tdListFree(pHandle->list); pHandle->list = tdListFree(pHandle->list);
taosMemoryFree(pHandle); taosMemoryFree(pHandle);
stDebug("failed to init stream backend at %s", backendPath); stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code));
taosMemoryFree(backendPath); taosMemoryFree(backendPath);
return NULL; return code;
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg; SBackendWrapper* pHandle = (SBackendWrapper*)arg;

View File

@ -232,7 +232,9 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
int32_t code = 0; int32_t code = 0;
SBackendWrapper* pBackend = NULL;
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
terrno = 0; terrno = 0;
bool exist = streamBackendDataIsExist(pMeta->path, chkpId); bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
if (exist == false) { if (exist == false) {
@ -240,7 +242,10 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
return code; return code;
} }
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId); code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
if (code) {
return code;
}
void* pIter = taosHashIterate(pBackend->cfInst, NULL); void* pIter = taosHashIterate(pBackend->cfInst, NULL);
while (pIter) { while (pIter) {
@ -259,9 +264,13 @@ _EXIT:
if (code == 0) { if (code == 0) {
char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32); char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
if (state != NULL) {
sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(state); taosRemoveDir(state);
taosMemoryFree(state); taosMemoryFree(state);
} else {
stError("vgId:%s, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
}
} }
return code; return code;
@ -451,6 +460,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
TSDB_CHECK_CODE(code, lino, _err); TSDB_CHECK_CODE(code, lino, _err);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
code = metaRefMgtAdd(pMeta->vgId, pRid); code = metaRefMgtAdd(pMeta->vgId, pRid);
TSDB_CHECK_CODE(code, lino, _err); TSDB_CHECK_CODE(code, lino, _err);

View File

@ -130,6 +130,11 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int32_t ret = 0; int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32); char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
if (fullname == NULL) {
stError("failed to get file:%s size, code: out of memory", name);
return terrno;
}
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL, NULL); ret = taosStatFile(fullname, sz, NULL, NULL);
@ -555,6 +560,11 @@ _NEXT:
(int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); (int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
if (buf == NULL) {
stError("%s failed to prepare the block header, code:Out of memory", item->name);
return terrno;
}
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
if (nread < 0) { if (nread < 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
@ -764,6 +774,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
if (path == NULL) {
stError("s-task:0x%x failed to prepare meta header buffer, code:Out of memory", (int32_t) snapInfo.taskId);
return terrno;
}
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", snapInfo.chkpId); "checkpoint", snapInfo.chkpId);
if (!taosIsDir(path)) { if (!taosIsDir(path)) {
@ -778,11 +793,19 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pDbSnapFile->path = path; pDbSnapFile->path = path;
pDbSnapFile->snapInfo = snapInfo; pDbSnapFile->snapInfo = snapInfo;
pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
if (pDbSnapFile->pFileList == NULL) {
return terrno;
}
pDbSnapFile->currFileIdx = 0; pDbSnapFile->currFileIdx = 0;
pDbSnapFile->offset = 0; pDbSnapFile->offset = 0;
SBackendFileItem item = {0}; SBackendFileItem item = {0};
item.name = taosStrdup((char*)ROCKSDB_CURRENT); item.name = taosStrdup((char*)ROCKSDB_CURRENT);
if (item.name == NULL) {
return terrno;
}
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
void* p = taosArrayPush(pDbSnapFile->pFileList, &item); void* p = taosArrayPush(pDbSnapFile->pFileList, &item);

View File

@ -133,6 +133,11 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId); sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf); pTask->id.idStr = taosStrdup(buf);
if (pTask->id.idStr == NULL) {
stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
return terrno;
}
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;