fix(stream): not return error when failing to load stream task meta.

This commit is contained in:
Haojun Liao 2024-03-25 23:41:22 +08:00
parent 717805c075
commit a11b2c614e
2 changed files with 18 additions and 27 deletions

View File

@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
return -1;
}
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
return 0;
}

View File

@ -824,13 +824,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
return chkpId;
}
static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
void* pKey = NULL;
@ -847,10 +840,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
return -1;
return TSDB_CODE_SUCCESS;
}
tdbTbcMoveToFirst(pCur);
@ -859,20 +853,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
doClear(pKey, pVal, pCur, pRecycleList);
return -1;
break;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder);
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream manually",
vgId, tsDataDir);
return -1;
break;
}
tDecoderClear(&decoder);
@ -892,10 +884,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) {
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask);
return -1;
continue;
}
taosArrayPush(pMeta->pTaskList, &pTask->id);
@ -907,9 +900,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask);
return -1;
continue;
}
if (pTask->info.fillHistory == 0) {
@ -925,10 +919,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
stError("vgId:%d failed to close meta-file cursor", vgId);
taosArrayDestroy(pRecycleList);
return -1;
stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno));
}
if (taosArrayGetSize(pRecycleList) > 0) {
@ -942,8 +935,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {