diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9420c5235d..e093625ef1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -739,6 +739,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); +void streamMetaInitForSnode(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 99de36af43..1bbb969e7b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -404,7 +404,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, 0) != CFG_SCOPE_SERVER) return -1; - if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_CLIENT) != 0) return -1; + if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1; return 0; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d6e575d97c..dbbd68fa08 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -112,18 +112,16 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } - pSnode->msgCb = pOption->msgCb; + pSnode->msgCb = pOption->msgCb; pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } - // todo fix it: send msg to mnode to rollback to an existed checkpoint, and broadcast the rollback msg to all other - // computing nodes. - pSnode->pMeta->stage = 0; - + // todo fix it: send msg to mnode to rollback to an existed checkpoint + streamMetaInitForSnode(pSnode->pMeta); return pSnode; FAIL: @@ -258,10 +256,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = htonl(pRsp->upstreamTaskId); - int64_t streamId = htobe64(pRsp->streamId); + pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); + pRsp->streamId = htobe64(pRsp->streamId); + pRsp->msgId = htonl(pRsp->msgId); - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pSnode->pMeta, pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fc3e17c826..aa32fb6493 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1083,12 +1083,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - -// SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData; -// if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { -// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id); -// streamProcessCheckpointReadyMsg(pTask); -// } } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2760d31828..5e25d911b0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -999,3 +999,8 @@ void streamMetaStartHb(SStreamMeta* pMeta) { *pRid = pMeta->rid; metaHbToMnode(pRid, NULL); } + +void streamMetaInitForSnode(SStreamMeta* pMeta) { + pMeta->stage = 0; + pMeta->leader = true; +} \ No newline at end of file diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index baf0682fbb..99bd90b006 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -22,7 +22,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.setsql = TDSetSql() self.dbname = 'db' self.stbname = 'stb' @@ -217,7 +217,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(193, len(tdSql.queryResult)) + tdSql.checkEqual(194, len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) diff --git a/tests/system-test/2-query/db.py b/tests/system-test/2-query/db.py index 451fc0caf3..6870c59a0d 100644 --- a/tests/system-test/2-query/db.py +++ b/tests/system-test/2-query/db.py @@ -55,7 +55,7 @@ class TDTestCase: tdSql.checkData(0, 2, 0) tdSql.query("show dnode 1 variables like '%debugFlag'") - tdSql.checkRows(21) + tdSql.checkRows(22) tdSql.query("show dnode 1 variables like '____debugFlag'") tdSql.checkRows(2)