fix:add finish history msg to snode
This commit is contained in:
parent
56381dea90
commit
8a9ded764d
|
@ -66,6 +66,7 @@ extern int32_t udfDebugFlag;
|
|||
extern int32_t smaDebugFlag;
|
||||
extern int32_t idxDebugFlag;
|
||||
extern int32_t tdbDebugFlag;
|
||||
extern int32_t sndDebugFlag;
|
||||
|
||||
int32_t taosInitLog(const char *logName, int32_t maxFiles);
|
||||
void taosCloseLog();
|
||||
|
|
|
@ -430,6 +430,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "sndDebugFlag", sndDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -946,6 +947,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
|
|||
tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32;
|
||||
metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32;
|
||||
stDebugFlag = cfgGetItem(pCfg, "stDebugFlag")->i32;
|
||||
sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32;
|
||||
}
|
||||
|
||||
static int32_t taosSetSlowLogScope(char *pScope) {
|
||||
|
@ -1668,6 +1670,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
|||
{"metaDebugFlag", &metaDebugFlag},
|
||||
{"jniDebugFlag", &jniDebugFlag},
|
||||
{"stDebugFlag", &stDebugFlag},
|
||||
{"sndDebugFlag", &sndDebugFlag},
|
||||
|
||||
{"audit", &tsEnableAudit},
|
||||
{"asynclog", &tsAsyncLog},
|
||||
|
@ -1786,6 +1789,7 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) {
|
|||
taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag, rewrite);
|
||||
taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite);
|
||||
taosSetDebugFlag(&stDebugFlag, "stDebugFlag", flag, rewrite);
|
||||
taosSetDebugFlag(&sndDebugFlag, "sndDebugFlag", flag, rewrite);
|
||||
uInfo("all debug flag are set to %d", flag);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,27 @@
|
|||
#include "tstream.h"
|
||||
#include "tuuid.h"
|
||||
|
||||
#define sndError(...) \
|
||||
do { \
|
||||
if (sndDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define sndInfo(...) \
|
||||
do { \
|
||||
if (sndDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define sndDebug(...) \
|
||||
do { \
|
||||
if (sndDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
|
@ -65,10 +86,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
|
||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
qError("s-task:%s failed to open state for task", pTask->id.idStr);
|
||||
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
|
||||
return -1;
|
||||
} else {
|
||||
qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||
}
|
||||
|
||||
int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
|
@ -85,7 +106,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
// checkpoint ver is the kept version, handled data should be the next version.
|
||||
if (pTask->chkInfo.checkpointId != 0) {
|
||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||
} else {
|
||||
if (pTask->chkInfo.nextProcessVer == -1) {
|
||||
|
@ -96,7 +117,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
|
||||
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam);
|
||||
|
@ -186,24 +207,23 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
|||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
|
||||
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
|
||||
sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
|
||||
pTask->id.idStr, p, numOfTasks);
|
||||
|
||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
streamTaskCheckDownstream(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
||||
sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||
|
||||
// commit the update
|
||||
streamMetaWLock(pSnode->pMeta);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||
sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||
|
||||
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
||||
// persist to disk
|
||||
|
@ -308,7 +328,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
|
@ -335,8 +355,38 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
//
|
||||
int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
// deserialize
|
||||
SStreamCompleteHistoryMsg req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
||||
pSnode->pMeta->vgId, req.upstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
||||
if (remain > 0) {
|
||||
sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
|
||||
pTask->id.idStr, req.downstreamId, remain);
|
||||
} else {
|
||||
sndDebug(
|
||||
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
||||
"completed msg",
|
||||
pTask->id.idStr, req.downstreamId);
|
||||
streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -360,11 +410,11 @@ int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg* pMsg) {
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
qError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId);
|
||||
sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId);
|
||||
return code;
|
||||
}
|
||||
|
||||
qDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId,
|
||||
sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId,
|
||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
|
||||
|
||||
streamProcessCheckpointReadyMsg(pTask);
|
||||
|
@ -403,11 +453,11 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
||||
sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
||||
pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
} else {
|
||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
||||
sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
||||
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
}
|
||||
|
||||
|
@ -417,7 +467,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
|
||||
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
|
||||
if (code < 0) {
|
||||
qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
|
||||
sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -452,12 +502,12 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||
sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
||||
sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
||||
pSnode->pMeta->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
@ -480,9 +530,9 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||
return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
|
||||
return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
|
||||
return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK:
|
||||
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
|
|
|
@ -110,6 +110,7 @@ int32_t metaDebugFlag = 131;
|
|||
int32_t udfDebugFlag = 131;
|
||||
int32_t smaDebugFlag = 131;
|
||||
int32_t idxDebugFlag = 131;
|
||||
int32_t sndDebugFlag = 131;
|
||||
|
||||
int64_t dbgEmptyW = 0;
|
||||
int64_t dbgWN = 0;
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
from util.cluster import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
|
||||
def case1(self):
|
||||
tdLog.debug("case1 start")
|
||||
|
||||
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &")
|
||||
time.sleep(1)
|
||||
tdSql.query("use test")
|
||||
tdSql.query("create snode on dnode 4")
|
||||
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)")
|
||||
tdLog.debug("create stream use snode and insert data")
|
||||
time.sleep(10)
|
||||
|
||||
tdDnodes = cluster.dnodes
|
||||
tdDnodes[3].stoptaosd()
|
||||
time.sleep(2)
|
||||
tdDnodes[3].starttaosd()
|
||||
tdLog.debug("snode restart ok")
|
||||
|
||||
time.sleep(500)
|
||||
os.system("kill -9 `pgrep taosBenchmark`")
|
||||
|
||||
tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)")
|
||||
# tdSql.checkRows(1)
|
||||
#
|
||||
# tdSql.checkData(0, 0, '2016-01-01 08:00:07.000')
|
||||
# tdSql.checkData(0, 1, 2000)
|
||||
|
||||
tdSql.query("drop snode on dnode 4")
|
||||
tdSql.query("drop stream if exists s1")
|
||||
tdSql.query("drop database test")
|
||||
|
||||
tdLog.debug("case1 end")
|
||||
|
||||
def case2(self):
|
||||
tdLog.debug("case2 start")
|
||||
|
||||
updatecfgDict = {'checkpointInterval': 5}
|
||||
print("===================: ", updatecfgDict)
|
||||
|
||||
tdDnodes = cluster.dnodes
|
||||
tdDnodes[0].stoptaosd()
|
||||
tdDnodes[1].stoptaosd()
|
||||
tdDnodes[2].stoptaosd()
|
||||
tdDnodes[3].stoptaosd()
|
||||
time.sleep(2)
|
||||
tdDnodes[0].starttaosd()
|
||||
tdDnodes[1].starttaosd()
|
||||
tdDnodes[2].starttaosd()
|
||||
tdDnodes[3].starttaosd()
|
||||
tdLog.debug("taosd restart ok")
|
||||
|
||||
os.system("taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 &" )
|
||||
time.sleep(1)
|
||||
tdSql.query("use test")
|
||||
tdSql.query("create snode on dnode 4")
|
||||
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)")
|
||||
tdLog.debug("create stream use snode and insert data")
|
||||
time.sleep(10)
|
||||
|
||||
tdDnodes[3].stoptaosd()
|
||||
time.sleep(2)
|
||||
tdDnodes[3].starttaosd()
|
||||
tdLog.debug("snode restart ok")
|
||||
|
||||
time.sleep(5)
|
||||
os.system("kill -9 `pgrep taosBenchmark`")
|
||||
|
||||
tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)")
|
||||
# tdSql.checkRows(1)
|
||||
#
|
||||
# tdSql.checkData(0, 0, '2016-01-01 08:00:07.000')
|
||||
# tdSql.checkData(0, 1, 2000)
|
||||
|
||||
tdLog.debug("case2 end")
|
||||
|
||||
def run(self):
|
||||
self.case1()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue