From f70321ee53f2403e770acd2bda3a41038b437549 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 10:52:48 +0800 Subject: [PATCH 01/11] fix(vnd): check return value. --- source/dnode/vnode/src/vnd/vnodeSvr.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 371eaa0774..1b1bb9257d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -640,40 +640,39 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_UPDATE_CHKPT: { - if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_CONSEN_CHKPT: { - if (pVnode->restored) { - if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) { - goto _err; - } + if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) { + goto _err; } + } break; case TDMT_STREAM_TASK_PAUSE: { if (pVnode->restored && vnodeIsLeader(pVnode) && - tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + (code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_RESUME: { if (pVnode->restored && vnodeIsLeader(pVnode) && - tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + (code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_VND_STREAM_TASK_RESET: { - if (pVnode->restored && vnodeIsLeader(pVnode)) { - if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) { + if (pVnode->restored && vnodeIsLeader(pVnode) && + (code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) { goto _err; } - } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; @@ -693,10 +692,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_DROP_INDEX: vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; - case TDMT_VND_STREAM_CHECK_POINT_SOURCE: + case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); break; - case TDMT_VND_STREAM_TASK_UPDATE: + case TDMT_VND_STREAM_TASK_UPDATE: // always return true tqProcessTaskUpdateReq(pVnode->pTq, pMsg); break; case TDMT_VND_COMPACT: @@ -752,7 +751,7 @@ _exit: _err: vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), - tstrerror(terrno), ver); + tstrerror(code), ver); return code; } From 3367f129daf04dbd3731c4481dd81a1a0dacff10 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 11:27:37 +0800 Subject: [PATCH 02/11] fix(vnd): check return value. --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1b1bb9257d..dd13c975cf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -633,9 +633,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; + if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) { goto _err; } } break; From 5dc933f5f1b80048a114dcd2d3ae652d05779c3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 14:52:51 +0800 Subject: [PATCH 03/11] refactor: add some logs. --- source/common/src/tdatablock.c | 1 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 +++- source/util/src/tarray.c | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d8a66f82bf..0d00a6a4c7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2529,6 +2529,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); if (pColInfoData == NULL) { code = terrno; + uError("invalid param, size of list:%d index k:%d", (int32_t) taosArrayGetSize(pDataBlock->pDataBlock), k) goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5b6511a38e..4e253d7c2e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -855,6 +855,7 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); if (p == NULL) { clearBrinBlockIter(&iter); + tsdbError("invalid param, empty in tablescanInfoList, %s", pReader->idStr); return TSDB_CODE_INVALID_PARA; } @@ -5256,7 +5257,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { // NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit // the data should be ingested in round-robin and all the child tables should be createted before ingesting data // the version range of query will be used to identify the correctness of suspend/resume functions. - // this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files + // this function will be blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files #if SUSPEND_RESUME_TEST if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) { tsem_wait(&pReader->resumeAfterSuspend); @@ -5909,6 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_ } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { code = TSDB_CODE_INVALID_PARA; + tsdbError("invalid mr.me.type:%d %s, code:%s", mr.me.type, tstrerror(code)); metaReaderClear(&mr); return code; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 7989a2468b..b94bb512e2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -200,6 +200,7 @@ void* taosArrayPop(SArray* pArray) { void* taosArrayGet(const SArray* pArray, size_t index) { if (NULL == pArray) { terrno = TSDB_CODE_INVALID_PARA; + uError("failed to return value from array of null ptr"); return NULL; } From 5c1cffed692e27fcbc042bd2f2c571c3b5357a8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 15:54:15 +0800 Subject: [PATCH 04/11] fix(stream): add some logs. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 ++----- source/libs/stream/src/streamMeta.c | 4 ++++ source/libs/stream/src/streamSched.c | 6 +++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 6b7e857120..3871011407 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t int32_t vgId = pMeta->vgId; int32_t code = 0; - if (pTask == NULL) { - return -1; - } - streamTaskResume(pTask); ETaskStatus status = streamTaskGetStatus(pTask).state; @@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } } - streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { + streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m streamMutexUnlock(&pHTask->lock); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); + streamMetaReleaseTask(pMeta, pHTask); } return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 44c9e76906..29152c6205 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -759,6 +759,10 @@ void streamMetaAcquireOneTask(SStreamTask* pTask) { } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { + if (pTask == NULL) { + return; + } + int32_t taskId = pTask->id.taskId; int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 98920e6f70..095a5af6d4 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 pRunReq->reqType = execType; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + } + return code; } void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } From d2f2a931fb0cf58c27434a04b6e8ad9a53fb9916 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 16:21:11 +0800 Subject: [PATCH 05/11] fix(util): reset the returned length value. --- source/common/src/tdatablock.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2047573b74..98e58c8bd7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2530,7 +2530,6 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); if (pColInfoData == NULL) { code = terrno; - uError("invalid param, size of list:%d index k:%d", (int32_t) taosArrayGetSize(pDataBlock->pDataBlock), k) goto _exit; } @@ -2611,7 +2610,10 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf if (code < 0) { uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); goto _exit; + } else { // reset the length value + code = TSDB_CODE_SUCCESS; } + len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) goto _exit; } break; From 6e43521ba9fb2e6d4b580ef09639a16bc3a5e9d8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 18:28:14 +0800 Subject: [PATCH 06/11] fix(stream): only keep the latest pause operation status. --- source/libs/stream/src/streamTaskSm.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 3501d30be4..a10c4c30d5 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -485,6 +485,11 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; + // we only keep the latest pause state + if (pSM->prev.state.state == TASK_STATUS__PAUSE && pSM->current.state == TASK_STATUS__PAUSE) { + return; + } + pSM->prev.state = pSM->current; pSM->prev.evt = pTrans->event; } @@ -501,9 +506,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && - s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { - stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); + if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT && + s != TASK_STATUS__READY) { + stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, + GET_EVT_NAME(pSM->prev.evt)); } // the pSM->prev.evt may be 0, so print string is not appropriate. From 7e1c6b07392f21b29c221af1399b73754ba61617 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 18:34:52 +0800 Subject: [PATCH 07/11] fix(stream): avoid the later pause overwrite the previous pause state. --- source/libs/stream/src/streamTaskSm.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index a10c4c30d5..17d5d884a7 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -485,11 +485,6 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; - // we only keep the latest pause state - if (pSM->prev.state.state == TASK_STATUS__PAUSE && pSM->current.state == TASK_STATUS__PAUSE) { - return; - } - pSM->prev.state = pSM->current; pSM->prev.evt = pTrans->event; } @@ -527,10 +522,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even return TSDB_CODE_STREAM_INVALID_STATETRANS; } - keepPrevInfo(pSM); + // repeat pause will not overwrite the previous pause state + if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) { + keepPrevInfo(pSM); - pSM->current = pTrans->next; - pSM->pActiveTrans = NULL; + pSM->current = pTrans->next; + pSM->pActiveTrans = NULL; + } // todo remove it // todo: handle the error code From a197b20466f111589d3822aa51f3e47eaf7fbf12 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 19:28:08 +0800 Subject: [PATCH 08/11] other: update logs. --- source/libs/stream/src/streamTimer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8b77fe7cb1..0da9acfd1d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* } } - stDebug("vgId:%d start %s tmr succ", vgId, pMsg); + stTrace("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) { From 50ceb19cbff69a3f96d7230e9f7b03667614f1bc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Oct 2024 19:46:54 +0800 Subject: [PATCH 09/11] fix(stream): reset the activeTrans if pause recv repeatly. --- source/libs/stream/src/streamTaskSm.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 17d5d884a7..c3a2742aa2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -525,11 +525,12 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // repeat pause will not overwrite the previous pause state if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) { keepPrevInfo(pSM); - pSM->current = pTrans->next; - pSM->pActiveTrans = NULL; + } else { + stDebug("s-task:%s repeat pause evt recv, not update prev status", id); } + pSM->pActiveTrans = NULL; // todo remove it // todo: handle the error code // on success callback, add into lock if necessary, or maybe we should add an option for this? From 27c087e9aec5bd1a499a4c2e545c283d7536b727 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Oct 2024 18:36:49 +0800 Subject: [PATCH 10/11] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- source/dnode/mnode/impl/src/mndStreamTransAct.c | 9 +++------ source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 69d3de25fc..a4327b777f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1783,7 +1783,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - int32_t code = 0; + int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return code; @@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return 0; } + mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); return -1; diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 4e0bf97587..139ea4f147 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { - terrno = code; taosMemoryFree(pReq); - return terrno; + return code; } code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); - return terrno; + return code; } mDebug("set the resume action for trans:%d", pTrans->id); - return 0; + return code; } static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 44a39f4328..c4971e27cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5910,7 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_ } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { code = TSDB_CODE_INVALID_PARA; - tsdbError("invalid mr.me.type:%d %s, code:%s", mr.me.type, tstrerror(code)); + tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code)); metaReaderClear(&mr); return code; } From dd05353b74e2b7b84fdaf42adaacb164b53a0fd2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Oct 2024 22:07:37 +0800 Subject: [PATCH 11/11] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 3 ++- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamMeta.c | 5 +++-- source/libs/stream/src/streamSched.c | 4 ++-- source/libs/stream/src/streamTask.c | 3 +++ 8 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6d750468e..58c1707e1f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -void streamMetaAcquireOneTask(SStreamTask* pTask); +int32_t streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3871011407..a00e92997c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if ((ppTask != NULL) && ((*ppTask) != NULL)) { - streamMetaAcquireOneTask(*ppTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask); SStreamTask* pTask = *ppTask; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 75bcc326b3..c1c54b3c0b 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { return; } - /*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e44bca123b..be914d9746 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 133663ac28..62d60ff664 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 29152c6205..7e9b60b61a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -753,9 +753,10 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task return code; } -void streamMetaAcquireOneTask(SStreamTask* pTask) { +int32_t streamMetaAcquireOneTask(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); + return ref; } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { @@ -866,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - // it is an fill-history task, remove the related stream task's id that points to it + // it is a fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 095a5af6d4..cdaa603e38 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delaySchema = pTask->info.delaySchedParam; if (delaySchema != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + int32_t ref = streamMetaAcquireOneTask(pTask); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); @@ -80,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 71a2ed3e4a..727701e03e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->inputq.queue) { streamQueueClose(pTask->inputq.queue, pTask->id.taskId); + pTask->inputq.queue = NULL; } if (pTask->outputq.queue) { streamQueueClose(pTask->outputq.queue, pTask->id.taskId); + pTask->outputq.queue = NULL; } if (pTask->exec.qmsg) { @@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->exec.pWalReader != NULL) { walCloseReader(pTask->exec.pWalReader); + pTask->exec.pWalReader = NULL; } streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);