diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ffebd783ac..ccfc8cc7c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - return -1; - } - + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5f6440c06d..fc06a8975f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -824,13 +824,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } -static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - taosArrayDestroy(pRecycleList); -} - int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; void* pKey = NULL; @@ -847,10 +840,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - return -1; + return TSDB_CODE_SUCCESS; } tdbTbcMoveToFirst(pCur); @@ -859,20 +853,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno)); - doClear(pKey, pVal, pCur, pRecycleList); - return -1; + break; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); - doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "stream manually", vgId, tsDataDir); - return -1; + break; } tDecoderClear(&decoder); @@ -892,10 +884,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); + if (code < 0) { + stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); - return -1; + continue; } taosArrayPush(pMeta->pTaskList, &pTask->id); @@ -907,9 +900,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); + taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); - return -1; + continue; } if (pTask->info.fillHistory == 0) { @@ -925,10 +919,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); + if (tdbTbcClose(pCur) < 0) { - stError("vgId:%d failed to close meta-file cursor", vgId); - taosArrayDestroy(pRecycleList); - return -1; + stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno)); } if (taosArrayGetSize(pRecycleList) > 0) { @@ -942,8 +935,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); + taosArrayDestroy(pRecycleList); - return 0; + return TSDB_CODE_SUCCESS; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {