diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index da6d35327e..cfc2b08ab6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,9 +29,10 @@ #include "tname.h" #include "tmisce.h" -#define MND_STREAM_VER_NUMBER 3 -#define MND_STREAM_RESERVE_SIZE 64 -#define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_VER_NUMBER 3 +#define MND_STREAM_RESERVE_SIZE 64 +#define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" typedef struct SNodeEntry { int32_t nodeId; @@ -52,7 +53,6 @@ typedef struct SVgroupChangeInfo { static int32_t mndNodeCheckSentinel = 0; static SStreamVnodeRevertIndex execNodeList; -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); @@ -1827,7 +1827,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 703ff697c3..859c487faf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1902,6 +1902,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { _end: tDecoderClear(&decoder); tmsgSendRsp(&rsp); + + tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f5fb3353a1..cab6c240ab 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -89,7 +89,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i resetTaskInfo(pExecutor); } - qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr()); + qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code)); continue; } @@ -547,7 +547,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; if (streamTaskShouldStop(&pTask->status)) { - qDebug("s-task:%s stream task stopped, abort", id); + qDebug("s-task:%s stream task is stopped", id); break; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6899573427..9d7aa09911 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -254,7 +254,9 @@ static void freeUpstreamItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask); + int32_t taskId = pTask->id.taskId; + + qDebug("free s-task:0x%x, %p", taskId, pTask); // remove the ref by timer while(pTask->status.timerActive > 0) { @@ -304,6 +306,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->pState) { + qDebug("s-task:0x%x start to free task state", taskId); streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } @@ -330,6 +333,8 @@ void tFreeStreamTask(SStreamTask* pTask) { taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); + + qDebug("s-task:0x%x free task completed", taskId); } int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {