From 1256eafddb2be460b4bd75aec6a975b8e1d68eec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Sep 2023 14:09:05 +0800 Subject: [PATCH] other: merge stream fix. --- include/libs/stream/tstream.h | 2 +- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 10 ++--- source/dnode/vnode/src/tq/tq.c | 37 +++++++++---------- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 7 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 6 +-- source/libs/stream/src/stream.c | 1 - source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamQueue.c | 2 +- source/libs/stream/src/streamTask.c | 21 +++++++++-- 11 files changed, 48 insertions(+), 44 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a5baf33612..60043d4df6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -718,7 +718,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); +int32_t streamMetaReopen(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 90812a66b2..1069c5830e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -241,7 +241,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 600; +int32_t tsStreamCheckpointTickInterval = 30; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d3162a143d..6d44d55d25 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -65,9 +65,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamHb(SRpcMsg *pReq); -static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); -static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); -static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -1063,8 +1060,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in // return -1; // } -static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, - int64_t checkpointId) { +static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) { taosWLockLatch(&pStream->lock); int32_t totLevel = taosArrayGetSize(pStream->tasks); @@ -1088,7 +1084,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream void *buf; int32_t tlen; - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, pTask->id.taskId) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosWUnLockLatch(&pStream->lock); @@ -1109,7 +1105,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } } - pStream->checkpointId = checkpointId; + pStream->checkpointId = chkptId; pStream->checkpointFreq = taosGetTimestampMs(); atomic_store_64(&pStream->currentTick, 0); // 3. commit log: stream checkpoint info diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7186adc2c4..ab6e0d1171 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1698,9 +1698,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { rsp.code = TSDB_CODE_MSG_DECODE_ERROR; tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); - goto _end; + tDecoderClear(&decoder); + return rsp.code; } + tDecoderClear(&decoder); + // update the nodeEpset when it exists taosWLockLatch(&pMeta->lock); @@ -1713,7 +1716,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { req.taskId); rsp.code = TSDB_CODE_SUCCESS; taosWUnLockLatch(&pMeta->lock); - goto _end; + return rsp.code; } SStreamTask* pTask = *ppTask; @@ -1753,37 +1756,32 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamTaskStop(*ppHTask); } - tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); - pMeta->closedTask += 1; if (ppHTask != NULL) { + tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); pMeta->closedTask += 1; + } else { + tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } + rsp.code = 0; + // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - allStopped = (pMeta->closedTask == numOfTasks); - if (allStopped) { - pMeta->closedTask = 0; + if (pMeta->closedTask < numOfTasks) { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); + taosWUnLockLatch(&pMeta->lock); } else { - tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); - } - - taosWUnLockLatch(&pMeta->lock); - -_end: - tDecoderClear(&decoder); - - if (allStopped) { + pMeta->closedTask = 0; if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); + taosWUnLockLatch(&pMeta->lock); } else { - tqDebug("vgId:%d all tasks are stopped, restart them", vgId); - taosWLockLatch(&pMeta->lock); + tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; - int32_t code = streamMetaReopen(pMeta, 0); + int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); taosWUnLockLatch(&pMeta->lock); @@ -1807,4 +1805,3 @@ _end: return rsp.code; } - diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 4a1b3961cd..a016498980 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -168,7 +168,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); - int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); + int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta); if (code == 0) { code = streamStateLoadTasks(pWriter); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1ac2ddb9cb..255f71bf30 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -201,8 +201,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); - + tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } @@ -232,7 +231,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); + tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; @@ -314,7 +313,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); - /*int32_t code = */ streamSchedExec(pTask); + /*int32_t code = */streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", id, ver, maxVer); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 43850ebfee..3a7a60fcbb 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -553,13 +553,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; if (vnodeIsRoleLeader(pVnode)) { - vInfo("vgId:%d, sync restore finished, start to launch stream tasks", vgId); - // start to restore all stream tasks if (tsDisableStream) { - vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); + vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { - vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); + vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqStartStreamTasks(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1b4de5e6c4..5a7e14c629 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -109,7 +109,6 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); - if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 652ef7cde7..55d9a46b11 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -211,7 +211,7 @@ _err: return NULL; } -int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { +int32_t streamMetaReopen(SStreamMeta* pMeta) { streamMetaClear(pMeta); pMeta->streamBackendRid = -1; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 29ca351a6b..a9d0c3b77e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -395,7 +395,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc } int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { - if (cap < 100 || rate < 50 || pBucket == NULL) { + if (cap < 50 || rate < 50 || pBucket == NULL) { qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); return TSDB_CODE_INVALID_PARA; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 71a9a3102c..af550f86cb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -384,8 +384,22 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; - streamTaskInitTokenBucket(&pTask->tokenBucket, 100, 100); - taosThreadMutexInit(&pTask->lock, NULL); + streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50); + + TdThreadMutexAttr attr = {0}; + int ret = taosThreadMutexAttrInit(&attr); + if (ret != 0) { + qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret)); + return ret; + } + + ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + if (ret != 0) { + qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret)); + return ret; + } + + taosThreadMutexInit(&pTask->lock, &attr); streamTaskOpenAllUpstreamInput(pTask); return TSDB_CODE_SUCCESS; @@ -578,7 +592,8 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) { int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { taosThreadMutexLock(&pTask->lock); int8_t status = pTask->status.schedStatus; - ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE); + ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE || + status == TASK_SCHED_STATUS__INACTIVE); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; taosThreadMutexUnlock(&pTask->lock);