diff --git a/include/util/tlog.h b/include/util/tlog.h index a6d146a79e..6d393bfefb 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -66,6 +66,7 @@ extern int32_t udfDebugFlag; extern int32_t smaDebugFlag; extern int32_t idxDebugFlag; extern int32_t tdbDebugFlag; +extern int32_t sndDebugFlag; int32_t taosInitLog(const char *logName, int32_t maxFiles); void taosCloseLog(); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4a1ba9e391..47988b0de9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -430,6 +430,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "sndDebugFlag", sndDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; return 0; } @@ -946,6 +947,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32; metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32; stDebugFlag = cfgGetItem(pCfg, "stDebugFlag")->i32; + sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } static int32_t taosSetSlowLogScope(char *pScope) { @@ -1668,6 +1670,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"metaDebugFlag", &metaDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"stDebugFlag", &stDebugFlag}, + {"sndDebugFlag", &sndDebugFlag}, {"audit", &tsEnableAudit}, {"asynclog", &tsAsyncLog}, @@ -1786,6 +1789,7 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) { taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag, rewrite); taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite); taosSetDebugFlag(&stDebugFlag, "stDebugFlag", flag, rewrite); + taosSetDebugFlag(&sndDebugFlag, "sndDebugFlag", flag, rewrite); uInfo("all debug flag are set to %d", flag); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 5fd185ba4d..718f4e851b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -19,6 +19,27 @@ #include "tstream.h" #include "tuuid.h" +#define sndError(...) \ + do { \ + if (sndDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + +#define sndInfo(...) \ + do { \ + if (sndDebugFlag & DEBUG_INFO) { \ + taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + +#define sndDebug(...) \ + do { \ + if (sndDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { char *msgStr = pMsg->pCont; char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); @@ -65,10 +86,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { - qError("s-task:%s failed to open state for task", pTask->id.idStr); + sndError("s-task:%s failed to open state for task", pTask->id.idStr); return -1; } else { - qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList); @@ -85,7 +106,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; - qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } else { if (pTask->chkInfo.nextProcessVer == -1) { @@ -96,7 +117,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer char* p = NULL; streamTaskGetStatus(pTask, &p); - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); @@ -186,24 +207,23 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { char* p = NULL; streamTaskGetStatus(pTask, &p); - qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, + sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, p, numOfTasks); EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, event); - streamTaskCheckDownstream(pTask); return 0; } int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; - qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); + sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); // commit the update streamMetaWLock(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); + sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pSnode->pMeta) < 0) { // persist to disk @@ -308,7 +328,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return 0; } -int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { +int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -335,8 +355,38 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) return 0; } -int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { - // +int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pSnode->pMeta->vgId, req.upstreamTaskId); + return -1; + } + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", + pTask->id.idStr, req.downstreamId, remain); + } else { + sndDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr, req.downstreamId); + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } @@ -360,11 +410,11 @@ int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { - qError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId); + sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId); return code; } - qDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId, + sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId, pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); streamProcessCheckpointReadyMsg(pTask); @@ -403,11 +453,11 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { streamMetaReleaseTask(pSnode->pMeta, pTask); char* p = NULL; streamTaskGetStatus(pTask, &p); - qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; - qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", + sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -417,7 +467,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); + sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); return -1; } @@ -452,12 +502,12 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, + sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pSnode->pMeta->vgId); return -1; } @@ -480,9 +530,9 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg); + return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); + return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return sndProcessStreamTaskCheckReq(pSnode, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index aa6719f604..46b25dee6a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -110,6 +110,7 @@ int32_t metaDebugFlag = 131; int32_t udfDebugFlag = 131; int32_t smaDebugFlag = 131; int32_t idxDebugFlag = 131; +int32_t sndDebugFlag = 131; int64_t dbgEmptyW = 0; int64_t dbgWN = 0; diff --git a/tests/system-test/8-stream/snodeRestart.py b/tests/system-test/8-stream/snodeRestart.py new file mode 100644 index 0000000000..65ca090df9 --- /dev/null +++ b/tests/system-test/8-stream/snodeRestart.py @@ -0,0 +1,107 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + + def case1(self): + tdLog.debug("case1 start") + + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") + time.sleep(1) + tdSql.query("use test") + tdSql.query("create snode on dnode 4") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)") + tdLog.debug("create stream use snode and insert data") + time.sleep(10) + + tdDnodes = cluster.dnodes + tdDnodes[3].stoptaosd() + time.sleep(2) + tdDnodes[3].starttaosd() + tdLog.debug("snode restart ok") + + time.sleep(500) + os.system("kill -9 `pgrep taosBenchmark`") + + tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)") + # tdSql.checkRows(1) + # + # tdSql.checkData(0, 0, '2016-01-01 08:00:07.000') + # tdSql.checkData(0, 1, 2000) + + tdSql.query("drop snode on dnode 4") + tdSql.query("drop stream if exists s1") + tdSql.query("drop database test") + + tdLog.debug("case1 end") + + def case2(self): + tdLog.debug("case2 start") + + updatecfgDict = {'checkpointInterval': 5} + print("===================: ", updatecfgDict) + + tdDnodes = cluster.dnodes + tdDnodes[0].stoptaosd() + tdDnodes[1].stoptaosd() + tdDnodes[2].stoptaosd() + tdDnodes[3].stoptaosd() + time.sleep(2) + tdDnodes[0].starttaosd() + tdDnodes[1].starttaosd() + tdDnodes[2].starttaosd() + tdDnodes[3].starttaosd() + tdLog.debug("taosd restart ok") + + os.system("taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 &" ) + time.sleep(1) + tdSql.query("use test") + tdSql.query("create snode on dnode 4") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)") + tdLog.debug("create stream use snode and insert data") + time.sleep(10) + + tdDnodes[3].stoptaosd() + time.sleep(2) + tdDnodes[3].starttaosd() + tdLog.debug("snode restart ok") + + time.sleep(5) + os.system("kill -9 `pgrep taosBenchmark`") + + tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)") + # tdSql.checkRows(1) + # + # tdSql.checkData(0, 0, '2016-01-01 08:00:07.000') + # tdSql.checkData(0, 1, 2000) + + tdLog.debug("case2 end") + + def run(self): + self.case1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/output.txt b/tests/system-test/output.txt deleted file mode 100644 index e69de29bb2..0000000000