diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7eb0c7b9aa..c80f46f528 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -75,6 +75,9 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); +static SArray *doExtractNodeListFromStream(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); +static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); @@ -1097,6 +1100,22 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream return 0; } +static const char *mndGetStreamDB(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + return NULL; + } + + const char *p = taosStrdup(pStream->sourceDb); + mndReleaseStream(pMnode, pStream); + sdbCancelFetch(pSdb, pIter); + return p; +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1104,6 +1123,36 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; + { + int64_t ts = taosGetTimestampSec(); + + if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { + if (execNodeList.pNodeEntryList != NULL) { + execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); + } + execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); + } + + if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { + mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); + execNodeList.ts = ts; + atomic_store_32(&mndNodeCheckSentinel, 0); + return 0; + } + + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + taosArrayDestroy(changeInfo.pUpdateNodeList); + taosHashCleanup(changeInfo.pDBMap); + + if (nodeUpdated) { + mDebug("stream task not ready due to node update, not generate checkpoint"); + return 0; + } + } + SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; @@ -1114,7 +1163,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); - mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); + const char* pDb = mndGetStreamDB(pMnode); + mndTransSetDbName(pTrans, pDb, "checkpoint"); + taosMemoryFree((void*)pDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1132,11 +1184,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { break; } } + if (code == 0) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("failed to prepre trans rebalance since %s", terrstr()); } } + mndTransDrop(pTrans); return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 42ec7d320b..0ce4aa0d1e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1883,6 +1883,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamMetaSaveTask(pMeta, pTask); streamTaskStop(pTask); taosWLockLatch(&pMeta->lock); @@ -1895,6 +1896,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // all tasks are closed, now let's restart the stream meta if (pMeta->closedTask == numOfCount) { + tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId); if (streamMetaCommit(pMeta) < 0) { // persist to disk } @@ -1910,18 +1912,23 @@ _end: if (restartTasks) { tqDebug("vgId:%d all tasks are stopped, restart them", vgId); + taosWLockLatch(&pMeta->lock); - streamMetaClose(pTask->pMeta); - - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); - if (pTq->pStreamMeta == NULL) { + terrno = 0; + int32_t code = streamMetaReopen(pTq->pStreamMeta, 0); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + taosWUnLockLatch(&pMeta->lock); return -1; } if (streamLoadTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + taosWUnLockLatch(&pMeta->lock); return -1; } + taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart to all stream tasks", vgId); tqCheckStreamStatus(pTq); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 072b74cd9a..899ffdcd4d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -24,9 +24,9 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; -int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); - +static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); +static void streamMetaClear(SStreamMeta* pMeta); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); @@ -130,31 +130,8 @@ _err: } int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { - // stop all running tasking and reopen later - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } + streamMetaClear(pMeta); - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->schedTimer) { - taosTmrStop(pTask->schedTimer); - pTask->schedTimer = NULL; - } - - if (pTask->launchTaskTimer) { - taosTmrStop(pTask->launchTaskTimer); - pTask->launchTaskTimer = NULL; - } - - tFreeStreamTask(pTask); - } - - // close stream backend - // streamBackendCleanup(pMeta->streamBackend); - taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; @@ -165,60 +142,70 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); - if (taosRenameFile(newPath, defaultPath) < 0) { - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return -1; + int32_t code = taosStatFile(newPath, NULL, NULL, NULL); + if (code == 0) { + // directory exists + code = taosRenameFile(newPath, defaultPath); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + qError("vgId:%d failed to rename file, from %s to %s, code:%s", newPath, defaultPath, pMeta->vgId, + tstrerror(terrno)); + + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); + return -1; + } } - pMeta->streamBackend = streamBackendInit(pMeta->path, 0); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { + qError("vgId:%d failed to init stream backend", pMeta->vgId); return -1; } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - - taosHashClear(pMeta->pTasks); - - taosArrayClear(pMeta->pTaskList); - - taosHashClear(pMeta->pTaskBackendUnique); - - taosArrayClear(pMeta->chkpSaved); - - taosArrayClear(pMeta->chkpInUse); - return 0; } + +void streamMetaClear(SStreamMeta* pMeta) { + void* pIter = NULL; + while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) { + streamMetaReleaseTask(pMeta, *(SStreamTask**)pIter); + } + + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + + taosHashClear(pMeta->pTasks); + taosHashClear(pMeta->pTaskBackendUnique); + + taosArrayClear(pMeta->pTaskList); + taosArrayClear(pMeta->chkpSaved); + taosArrayClear(pMeta->chkpInUse); +} + void streamMetaClose(SStreamMeta* pMeta) { qDebug("start to close stream meta"); if (pMeta == NULL) { return; } + streamMetaClear(pMeta); + tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - tFreeStreamTask(*(SStreamTask**)pIter); - } - - taosHashCleanup(pMeta->pTasks); - taosRemoveRef(streamBackendId, pMeta->streamBackendRid); - pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); - taosMemoryFree(pMeta->path); - taosThreadMutexDestroy(&pMeta->backendMutex); - taosHashCleanup(pMeta->pTaskBackendUnique); - + taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpInUse); + taosHashCleanup(pMeta->pTasks); + taosHashCleanup(pMeta->pTaskBackendUnique); + + taosMemoryFree(pMeta->path); + taosThreadMutexDestroy(&pMeta->backendMutex); + taosMemoryFree(pMeta); qDebug("end to close stream meta"); } @@ -318,7 +305,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t return NULL; } -void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { +void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) { qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); @@ -458,7 +445,9 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { chkpId = TMAX(chkpId, info.checkpointId); } + qDebug("get max chkp id: %" PRId64 "", chkpId); + tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); @@ -466,6 +455,13 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } +static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + taosArrayDestroy(pRecycleList); +} + int32_t streamLoadTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; @@ -486,19 +482,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - taosArrayDestroy(pRecycleList); + doClear(pKey, pVal, pCur, pRecycleList); return -1; } + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - taosArrayDestroy(pRecycleList); + doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); qError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " @@ -513,7 +504,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); taosArrayPush(pRecycleList, &taskId); - int32_t total = taosArrayGetSize(pRecycleList); qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); continue; @@ -524,11 +514,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); + doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); - taosArrayDestroy(pRecycleList); return -1; } @@ -542,11 +529,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { } if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); + doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); - taosArrayDestroy(pRecycleList); return -1; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a08092e379..af283c590a 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -244,6 +244,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { launchFillHistoryTask(pTask); } +// todo handle error int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 65a836728b..e805bf0e72 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -475,7 +475,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__STOP; qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (!streamTaskIsIdle(pTask)) { + while (pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE) { qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); taosMsleep(100); }