Merge branch '3.0' into fix/fixChkpRespMemleak
This commit is contained in:
commit
0c0d9cdd2c
|
@ -45,6 +45,7 @@ typedef struct {
|
||||||
*/
|
*/
|
||||||
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption);
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption);
|
||||||
|
|
||||||
|
int32_t sndInit(SSnode * pSnode);
|
||||||
/**
|
/**
|
||||||
* @brief Stop Snode in Dnode.
|
* @brief Stop Snode in Dnode.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
//
|
||||||
|
// Created by mingming wanng on 2023/11/15.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef TDENGINE_STREAM_H
|
||||||
|
#define TDENGINE_STREAM_H
|
||||||
|
|
||||||
|
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
||||||
|
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
||||||
|
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
||||||
|
|
||||||
|
typedef struct STaskUpdateEntry {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t taskId;
|
||||||
|
int32_t transId;
|
||||||
|
} STaskUpdateEntry;
|
||||||
|
|
||||||
|
#endif // TDENGINE_STREAM_H
|
|
@ -589,7 +589,7 @@ typedef struct {
|
||||||
int32_t downstreamNodeId;
|
int32_t downstreamNodeId;
|
||||||
int32_t downstreamTaskId;
|
int32_t downstreamTaskId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int32_t oldStage;
|
int64_t oldStage;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
} SStreamTaskCheckRsp;
|
} SStreamTaskCheckRsp;
|
||||||
|
|
||||||
|
@ -654,7 +654,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
|
||||||
typedef struct STaskStatusEntry {
|
typedef struct STaskStatusEntry {
|
||||||
STaskId id;
|
STaskId id;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int32_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
int64_t verStart; // start version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t verEnd; // end version in WAL, only valid for source task
|
||||||
|
@ -757,7 +757,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
// recover and fill history
|
// recover and fill history
|
||||||
void streamTaskCheckDownstream(SStreamTask* pTask);
|
void streamTaskCheckDownstream(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage);
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||||
|
@ -828,7 +828,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -66,6 +66,7 @@ extern int32_t udfDebugFlag;
|
||||||
extern int32_t smaDebugFlag;
|
extern int32_t smaDebugFlag;
|
||||||
extern int32_t idxDebugFlag;
|
extern int32_t idxDebugFlag;
|
||||||
extern int32_t tdbDebugFlag;
|
extern int32_t tdbDebugFlag;
|
||||||
|
extern int32_t sndDebugFlag;
|
||||||
|
|
||||||
int32_t taosInitLog(const char *logName, int32_t maxFiles);
|
int32_t taosInitLog(const char *logName, int32_t maxFiles);
|
||||||
void taosCloseLog();
|
void taosCloseLog();
|
||||||
|
|
|
@ -167,7 +167,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
|
|
@ -431,6 +431,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "sndDebugFlag", sndDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -951,6 +952,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
|
||||||
tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32;
|
tdbDebugFlag = cfgGetItem(pCfg, "tdbDebugFlag")->i32;
|
||||||
metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32;
|
metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32;
|
||||||
stDebugFlag = cfgGetItem(pCfg, "stDebugFlag")->i32;
|
stDebugFlag = cfgGetItem(pCfg, "stDebugFlag")->i32;
|
||||||
|
sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosSetSlowLogScope(char *pScope) {
|
static int32_t taosSetSlowLogScope(char *pScope) {
|
||||||
|
@ -1424,7 +1426,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
|
||||||
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag},
|
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag},
|
||||||
{"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
|
{"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
|
||||||
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag},
|
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag},
|
||||||
{"jniDebugFlag", &jniDebugFlag}, {"stDebugFlag", &stDebugFlag},
|
{"jniDebugFlag", &jniDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
|
||||||
};
|
};
|
||||||
|
|
||||||
static OptionNameAndVar options[] = {
|
static OptionNameAndVar options[] = {
|
||||||
|
@ -1732,6 +1734,7 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) {
|
||||||
taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag, rewrite);
|
taosSetDebugFlag(&tdbDebugFlag, "tdbDebugFlag", flag, rewrite);
|
||||||
taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite);
|
taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite);
|
||||||
taosSetDebugFlag(&stDebugFlag, "stDebugFlag", flag, rewrite);
|
taosSetDebugFlag(&stDebugFlag, "stDebugFlag", flag, rewrite);
|
||||||
|
taosSetDebugFlag(&sndDebugFlag, "sndDebugFlag", flag, rewrite);
|
||||||
uInfo("all debug flag are set to %d", flag);
|
uInfo("all debug flag are set to %d", flag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@ SArray *smGetMsgHandles() {
|
||||||
SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
|
SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
|
||||||
if (pArray == NULL) goto _OVER;
|
if (pArray == NULL) goto _OVER;
|
||||||
|
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
@ -87,7 +88,8 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -76,9 +76,14 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) {
|
||||||
|
return sndInit(pMgmt->pSnode);
|
||||||
|
}
|
||||||
|
|
||||||
SMgmtFunc smGetMgmtFunc() {
|
SMgmtFunc smGetMgmtFunc() {
|
||||||
SMgmtFunc mgmtFunc = {0};
|
SMgmtFunc mgmtFunc = {0};
|
||||||
mgmtFunc.openFp = smOpen;
|
mgmtFunc.openFp = smOpen;
|
||||||
|
mgmtFunc.startFp = (NodeStartFp)smStartSnodes;
|
||||||
mgmtFunc.closeFp = (NodeCloseFp)smClose;
|
mgmtFunc.closeFp = (NodeCloseFp)smClose;
|
||||||
mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
|
mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
|
||||||
mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;
|
mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;
|
||||||
|
|
|
@ -919,7 +919,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
||||||
if (mndBuildRaftAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
|
|
|
@ -2145,6 +2145,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
||||||
taosArrayPush(info.pUpdateNodeList, &updateInfo);
|
taosArrayPush(info.pUpdateNodeList, &updateInfo);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
if(pCurrent->nodeId != SNODE_HANDLE){
|
||||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
|
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
|
||||||
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
|
@ -2200,6 +2203,24 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSnodeObj *pObj = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeEntry entry = {0};
|
||||||
|
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
|
entry.nodeId = SNODE_HANDLE;
|
||||||
|
|
||||||
|
char buf[256] = {0};
|
||||||
|
EPSET_TO_STR(&entry.epset, buf);
|
||||||
|
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
|
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||||
|
sdbRelease(pSdb, pObj);
|
||||||
|
}
|
||||||
|
|
||||||
return pVgroupListSnapshot;
|
return pVgroupListSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2377,6 +2398,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
|
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
|
||||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
||||||
|
|
||||||
|
if(pEntry->nodeId == SNODE_HANDLE) continue;
|
||||||
|
|
||||||
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
|
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
|
||||||
if (!existed) {
|
if (!existed) {
|
||||||
taosArrayPush(pRemovedTasks, pId);
|
taosArrayPush(pRemovedTasks, pId);
|
||||||
|
@ -2725,12 +2748,12 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
|
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int64_t stage) {
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
for(int32_t j = 0; j < numOfNodes; ++j) {
|
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||||
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
||||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||||
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
mInfo("vgId:%d stage updated from %"PRId64 " to %"PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||||
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||||
|
|
||||||
pNodeEntry->stageUpdated = true;
|
pNodeEntry->stageUpdated = true;
|
||||||
|
@ -2775,6 +2798,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool snodeChanged = false;
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||||
|
@ -2785,6 +2809,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
|
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
|
||||||
updateStageInfo(pTaskEntry, p->stage);
|
updateStageInfo(pTaskEntry, p->stage);
|
||||||
|
if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true;
|
||||||
} else {
|
} else {
|
||||||
streamTaskStatusCopy(pTaskEntry, p);
|
streamTaskStatusCopy(pTaskEntry, p);
|
||||||
if (p->activeCheckpointId != 0) {
|
if (p->activeCheckpointId != 0) {
|
||||||
|
@ -2813,7 +2838,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||||
taosArrayDestroy(p);
|
taosArrayDestroy(p);
|
||||||
|
|
||||||
if (allReady) {
|
if (allReady || snodeChanged) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
|
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
|
||||||
execInfo.activeCheckpoint);
|
execInfo.activeCheckpoint);
|
||||||
|
|
|
@ -3,6 +3,7 @@ add_library(snode STATIC ${SNODE_SRC})
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
snode
|
snode
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode"
|
PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode"
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
||||||
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
|
|
|
@ -18,6 +18,28 @@
|
||||||
#include "sndInt.h"
|
#include "sndInt.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "tuuid.h"
|
#include "tuuid.h"
|
||||||
|
#include "stream.h"
|
||||||
|
|
||||||
|
#define sndError(...) \
|
||||||
|
do { \
|
||||||
|
if (sndDebugFlag & DEBUG_ERROR) { \
|
||||||
|
taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define sndInfo(...) \
|
||||||
|
do { \
|
||||||
|
if (sndDebugFlag & DEBUG_INFO) { \
|
||||||
|
taosPrintLog("SND INFO ", DEBUG_INFO, sndDebugFlag, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define sndDebug(...) \
|
||||||
|
do { \
|
||||||
|
if (sndDebugFlag & DEBUG_DEBUG) { \
|
||||||
|
taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
char *msgStr = pMsg->pCont;
|
char *msgStr = pMsg->pCont;
|
||||||
|
@ -40,10 +62,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
@ -63,20 +89,37 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
|
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
SStreamTask* pSateTask = pTask;
|
||||||
if (pTask->pState == NULL) {
|
SStreamTask task = {0};
|
||||||
qError("s-task:%s failed to open state for task", pTask->id.idStr);
|
if (pTask->info.fillHistory) {
|
||||||
return -1;
|
task.id.streamId = pTask->streamTaskId.streamId;
|
||||||
} else {
|
task.id.taskId = pTask->streamTaskId.taskId;
|
||||||
qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
task.pMeta = pTask->pMeta;
|
||||||
|
pSateTask = &task;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList);
|
pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1);
|
||||||
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
|
if (pTask->pState == NULL) {
|
||||||
|
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
|
SReadHandle handle = {
|
||||||
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
|
.vnode = NULL,
|
||||||
|
.numOfVgroups = numOfVgroups,
|
||||||
|
.pStateBackend = pTask->pState,
|
||||||
|
.fillHistory = pTask->info.fillHistory,
|
||||||
|
.winRange = pTask->dataRange.window,
|
||||||
|
};
|
||||||
initStreamStateAPI(&handle.api);
|
initStreamStateAPI(&handle.api);
|
||||||
|
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
|
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId);
|
||||||
ASSERT(pTask->exec.pExecutor);
|
ASSERT(pTask->exec.pExecutor);
|
||||||
|
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
@ -85,7 +128,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pTask->chkInfo.checkpointId != 0) {
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||||
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
} else {
|
} else {
|
||||||
if (pTask->chkInfo.nextProcessVer == -1) {
|
if (pTask->chkInfo.nextProcessVer == -1) {
|
||||||
|
@ -96,7 +139,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam);
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam);
|
||||||
|
@ -104,6 +147,142 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t sndStartStreamTasks(SSnode* pSnode) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t vgId = SNODE_HANDLE;
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTaskList = NULL;
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
|
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||||
|
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// broadcast the check downstream tasks msg
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill-history task can only be launched by related stream tasks.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->status.downstreamReady == 1) {
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||||
|
pTask->id.idStr);
|
||||||
|
streamLaunchFillHistoryTask(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
code = ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sndResetStreamTaskStatus(SSnode* pSnode) {
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
|
sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
|
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||||
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
streamTaskResetStatus(*pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sndRestartStreamTasks(SSnode* pSnode) {
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
|
if (startVal == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
|
||||||
|
pMeta->updateInfo.transId);
|
||||||
|
|
||||||
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
|
sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
code = streamMetaReopen(pMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
sndError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaInitBackend(pMeta);
|
||||||
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
|
|
||||||
|
sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
||||||
|
|
||||||
|
code = streamMetaLoadAllTasks(pMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||||
|
sndResetStreamTaskStatus(pSnode);
|
||||||
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
sndStartStreamTasks(pSnode);
|
||||||
|
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
|
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
|
||||||
if (pSnode == NULL) {
|
if (pSnode == NULL) {
|
||||||
|
@ -117,17 +296,19 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pSnode->msgCb = pOption->msgCb;
|
pSnode->msgCb = pOption->msgCb;
|
||||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1);
|
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs());
|
||||||
if (pSnode->pMeta == NULL) {
|
if (pSnode->pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
stopRsync();
|
stopRsync();
|
||||||
startRsync();
|
startRsync();
|
||||||
|
|
||||||
// todo fix it: send msg to mnode to rollback to an existed checkpoint
|
|
||||||
streamMetaInitForSnode(pSnode->pMeta);
|
|
||||||
return pSnode;
|
return pSnode;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
@ -136,6 +317,12 @@ FAIL:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t sndInit(SSnode * pSnode) {
|
||||||
|
sndResetStreamTaskStatus(pSnode);
|
||||||
|
sndStartStreamTasks(pSnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void sndClose(SSnode *pSnode) {
|
void sndClose(SSnode *pSnode) {
|
||||||
streamMetaNotifyClose(pSnode->pMeta);
|
streamMetaNotifyClose(pSnode->pMeta);
|
||||||
streamMetaCommit(pSnode->pMeta);
|
streamMetaCommit(pSnode->pMeta);
|
||||||
|
@ -146,6 +333,33 @@ void sndClose(SSnode *pSnode) {
|
||||||
|
|
||||||
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
|
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
|
||||||
|
|
||||||
|
int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) {
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
sndDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
|
if (pRunReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||||
|
pRunReq->head.vgId = vgId;
|
||||||
|
pRunReq->streamId = 0;
|
||||||
|
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
|
||||||
|
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
|
tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -184,24 +398,23 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
|
sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
|
||||||
pTask->id.idStr, p, numOfTasks);
|
pTask->id.idStr, p, numOfTasks);
|
||||||
|
|
||||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
streamTaskCheckDownstream(pTask);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||||
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
||||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
streamMetaWLock(pSnode->pMeta);
|
streamMetaWLock(pSnode->pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
|
@ -213,6 +426,16 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||||
|
|
||||||
|
int32_t taskId = pReq->taskId;
|
||||||
|
|
||||||
|
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
|
||||||
|
sndStartStreamTasks(pSnode);
|
||||||
|
return 0;
|
||||||
|
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
|
||||||
|
sndRestartStreamTasks(pSnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamExecTask(pTask);
|
streamExecTask(pTask);
|
||||||
|
@ -231,14 +454,17 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
||||||
tDecodeStreamDispatchReq(&decoder, &req);
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,6 +496,9 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
|
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
|
||||||
pRsp->streamId = htobe64(pRsp->streamId);
|
pRsp->streamId = htobe64(pRsp->streamId);
|
||||||
|
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
|
||||||
|
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
|
||||||
|
pRsp->stage = htobe64(pRsp->stage);
|
||||||
pRsp->msgId = htonl(pRsp->msgId);
|
pRsp->msgId = htonl(pRsp->msgId);
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId);
|
||||||
|
@ -287,23 +516,7 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
switch (pMsg->msgType) {
|
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
|
||||||
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
|
||||||
return sndProcessTaskDeployReq(pSnode, pReq, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
case TDMT_STREAM_TASK_DROP:
|
|
||||||
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|
||||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
@ -330,11 +543,73 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
//
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
// deserialize
|
||||||
|
SStreamCompleteHistoryMsg req = {0};
|
||||||
|
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
||||||
|
pSnode->pMeta->vgId, req.upstreamTaskId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
||||||
|
if (remain > 0) {
|
||||||
|
sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
|
||||||
|
pTask->id.idStr, req.downstreamId, remain);
|
||||||
|
} else {
|
||||||
|
sndDebug(
|
||||||
|
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
||||||
|
"completed msg",
|
||||||
|
pTask->id.idStr, req.downstreamId);
|
||||||
|
streamProcessScanHistoryFinishRsp(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
|
||||||
|
int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg* pMsg) {
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SStreamCheckpointReadyMsg req = {0};
|
||||||
|
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
|
||||||
|
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId,
|
||||||
|
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
|
||||||
|
|
||||||
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
char *msgStr = pMsg->pCont;
|
char *msgStr = pMsg->pCont;
|
||||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
|
@ -362,15 +637,15 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
|
||||||
|
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
|
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
||||||
pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||||
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
||||||
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,7 +655,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
|
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
|
sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,12 +690,12 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
||||||
pSnode->pMeta->vgId);
|
pSnode->pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -430,6 +705,181 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t sndProcessTaskUpdateReq(SSnode* pSnode, SRpcMsg* pMsg) {
|
||||||
|
SStreamMeta* pMeta = pSnode->pMeta;
|
||||||
|
int32_t vgId = SNODE_HANDLE;
|
||||||
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||||
|
|
||||||
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
|
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
|
||||||
|
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
|
sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return rsp.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
// update the nodeEpset when it exists
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
if (ppTask == NULL || *ppTask == NULL) {
|
||||||
|
sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
|
req.taskId);
|
||||||
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return rsp.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
|
|
||||||
|
if (pMeta->updateInfo.transId != req.transId) {
|
||||||
|
pMeta->updateInfo.transId = req.transId;
|
||||||
|
sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||||
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
|
taosHashClear(pMeta->updateInfo.pTasks);
|
||||||
|
} else {
|
||||||
|
sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
||||||
|
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||||
|
if (exist != NULL) {
|
||||||
|
sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
|
req.transId);
|
||||||
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return rsp.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
||||||
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
streamTaskResetStatus(pTask);
|
||||||
|
|
||||||
|
// continue after lock the meta again
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
SStreamTask** ppHTask = NULL;
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||||
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
|
sndError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||||
|
pMeta->vgId, req.taskId);
|
||||||
|
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
||||||
|
} else {
|
||||||
|
sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamMetaSaveTask(pMeta, *ppHTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamTaskStop(pTask);
|
||||||
|
|
||||||
|
// keep the already handled info
|
||||||
|
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
|
||||||
|
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamTaskStop(*ppHTask);
|
||||||
|
sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
||||||
|
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
||||||
|
} else {
|
||||||
|
sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
rsp.code = 0;
|
||||||
|
|
||||||
|
// possibly only handle the stream task.
|
||||||
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
||||||
|
|
||||||
|
pMeta->startInfo.tasksWillRestart = 1;
|
||||||
|
|
||||||
|
if (updateTasks < numOfTasks) {
|
||||||
|
sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
} else {
|
||||||
|
sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
|
||||||
|
#if 1
|
||||||
|
sndStartStreamTaskAsync(pSnode, true);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
#else
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// For debug purpose.
|
||||||
|
// the following procedure consume many CPU resource, result in the re-election of leader
|
||||||
|
// with high probability. So we employ it as a test case for the stream processing framework, with
|
||||||
|
// checkpoint/restart/nodeUpdate etc.
|
||||||
|
while(1) {
|
||||||
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
|
if (startVal == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
int32_t code = streamMetaReopen(pMeta);
|
||||||
|
if (code != 0) {
|
||||||
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||||
|
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
|
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
|
||||||
|
tqResetStreamTaskStatus(pTq);
|
||||||
|
tqStartStreamTaskAsync(pTq, false);
|
||||||
|
} else {
|
||||||
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
|
}
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return rsp.code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_STREAM_TASK_RUN:
|
case TDMT_STREAM_TASK_RUN:
|
||||||
|
@ -443,13 +893,34 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
case TDMT_STREAM_RETRIEVE_RSP:
|
case TDMT_STREAM_RETRIEVE_RSP:
|
||||||
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
||||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||||
return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
|
return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg);
|
||||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||||
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
|
return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg);
|
||||||
case TDMT_VND_STREAM_TASK_CHECK:
|
case TDMT_VND_STREAM_TASK_CHECK:
|
||||||
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
|
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
|
||||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||||
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
|
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
|
||||||
|
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||||
|
return sndProcessTaskCheckpointReadyMsg(pSnode, pMsg);
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
switch (pMsg->msgType) {
|
||||||
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
|
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
return sndProcessTaskDeployReq(pSnode, pReq, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
case TDMT_STREAM_TASK_DROP:
|
||||||
|
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
|
||||||
|
case TDMT_VND_STREAM_TASK_UPDATE:
|
||||||
|
sndProcessTaskUpdateReq(pSnode, pMsg);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,6 +138,11 @@ else()
|
||||||
endif()
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
target_include_directories(
|
||||||
|
vnode
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
||||||
|
)
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
vnode
|
vnode
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
|
|
|
@ -43,9 +43,6 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct STqOffsetStore STqOffsetStore;
|
typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
||||||
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
|
||||||
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
|
||||||
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
|
||||||
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
|
|
|
@ -14,18 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "stream.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
} STqMgmt;
|
} STqMgmt;
|
||||||
|
|
||||||
typedef struct STaskUpdateEntry {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
int32_t transId;
|
|
||||||
} STaskUpdateEntry;
|
|
||||||
|
|
||||||
static STqMgmt tqMgmt = {0};
|
static STqMgmt tqMgmt = {0};
|
||||||
|
|
||||||
// 0: not init
|
// 0: not init
|
||||||
|
@ -929,12 +924,12 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
} else {
|
} else {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
|
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64
|
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||||
") task:0x%x (vgId:%d), check_status:%d",
|
") task:0x%x (vgId:%d), check_status:%d",
|
||||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1370,6 +1365,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1607,6 +1603,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "stream.h"
|
||||||
|
|
||||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||||
#define SCAN_WAL_IDLE_DURATION 100
|
#define SCAN_WAL_IDLE_DURATION 100
|
||||||
|
|
|
@ -293,6 +293,9 @@ typedef struct STableMergeScanInfo {
|
||||||
int32_t readIdx;
|
int32_t readIdx;
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
|
SSHashObj* mTableNumRows; // uid->num of table rows
|
||||||
|
SHashObj* mSkipTables;
|
||||||
|
int64_t mergeLimit;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
|
|
|
@ -3200,6 +3200,27 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
|
int64_t nRows = 0;
|
||||||
|
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
||||||
|
if (pNum == NULL) {
|
||||||
|
nRows = pBlock->info.rows;
|
||||||
|
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
|
||||||
|
} else {
|
||||||
|
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nRows >= pInfo->mergeLimit) {
|
||||||
|
if (pInfo->mSkipTables == NULL) {
|
||||||
|
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
}
|
||||||
|
int bSkip = 1;
|
||||||
|
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
STableMergeScanSortSourceParam* source = param;
|
STableMergeScanSortSourceParam* source = param;
|
||||||
SOperatorInfo* pOperator = source->pOperator;
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
@ -3257,6 +3278,10 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
|
|
||||||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
|
if (pInfo->mergeLimit != -1) {
|
||||||
|
tableMergeScanDoSkipTable(pInfo, pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
@ -3316,22 +3341,20 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
|
tSimpleHashClear(pInfo->mTableNumRows);
|
||||||
int64_t mergeLimit = -1;
|
|
||||||
if (hasLimit) {
|
|
||||||
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
|
||||||
}
|
|
||||||
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
|
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
|
||||||
if (hasLimit) {
|
// if (pInfo->mergeLimit != -1) {
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
||||||
NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
|
// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
|
||||||
} else {
|
// } else
|
||||||
|
{
|
||||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||||
|
|
||||||
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
|
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
|
||||||
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3343,7 +3366,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||||
param->pOperator = pOperator;
|
param->pOperator = pOperator;
|
||||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||||
|
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
|
||||||
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
ps->param = param;
|
ps->param = param;
|
||||||
|
@ -3385,6 +3409,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->pSortHandle = NULL;
|
pInfo->pSortHandle = NULL;
|
||||||
|
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
taosHashCleanup(pInfo->mSkipTables);
|
||||||
|
pInfo->mSkipTables = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3493,7 +3519,10 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
|
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
|
||||||
|
pTableScanInfo->mTableNumRows = NULL;
|
||||||
|
taosHashCleanup(pTableScanInfo->mSkipTables);
|
||||||
|
pTableScanInfo->mSkipTables = NULL;
|
||||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||||
|
|
||||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||||
|
@ -3583,7 +3612,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
|
pInfo->mTableNumRows = tSimpleHashInit(1024,
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
|
pInfo->mergeLimit = -1;
|
||||||
|
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
|
||||||
|
if (hasLimit) {
|
||||||
|
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
||||||
|
pInfo->mSkipTables = NULL;
|
||||||
|
}
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
|
|
|
@ -283,7 +283,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDeleteStreamDispatchReq(pReq);
|
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -519,6 +519,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
|
stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -372,7 +372,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
pTask->msgInfo.pData = pReqs;
|
pTask->msgInfo.pData = pReqs;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->execInfo.dispatch);
|
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch,
|
||||||
|
pTask->pMeta->stage);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -937,8 +938,10 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
||||||
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
||||||
info.msg.info.noResp = 1; // refactor later.
|
info.msg.info.noResp = 1; // refactor later.
|
||||||
|
|
||||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d",
|
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
|
||||||
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index);
|
":0x%x (vgId:%d) idx:%d, vgId:%d",
|
||||||
|
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
|
||||||
|
req.upstreamNodeId);
|
||||||
|
|
||||||
if (pTask->pReadyMsgList == NULL) {
|
if (pTask->pReadyMsgList == NULL) {
|
||||||
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
|
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
|
||||||
|
|
|
@ -174,6 +174,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
pMeta->stage = stage;
|
pMeta->stage = stage;
|
||||||
|
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
||||||
|
|
||||||
// send heartbeat every 5sec.
|
// send heartbeat every 5sec.
|
||||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||||
|
@ -204,7 +205,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
}
|
}
|
||||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||||
|
|
||||||
pMeta->role = NODE_ROLE_UNINIT;
|
|
||||||
code = streamBackendLoadCheckpointInfo(pMeta);
|
code = streamBackendLoadCheckpointInfo(pMeta);
|
||||||
|
|
||||||
taosInitRWLatch(&pMeta->lock);
|
taosInitRWLatch(&pMeta->lock);
|
||||||
|
@ -784,7 +784,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
|
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
|
||||||
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
|
||||||
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
|
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
|
||||||
|
@ -822,7 +822,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
|
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
|
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
|
||||||
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
|
||||||
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
|
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
|
||||||
|
@ -938,7 +938,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
SStreamHbMsg hbMsg = {0};
|
SStreamHbMsg hbMsg = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
bool hasMnodeEpset = false;
|
bool hasMnodeEpset = false;
|
||||||
int32_t stage = 0;
|
int64_t stage = 0;
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
@ -1117,11 +1117,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
||||||
metaHbToMnode(pRid, NULL);
|
metaHbToMnode(pRid, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaInitForSnode(SStreamMeta* pMeta) {
|
|
||||||
pMeta->stage = 0;
|
|
||||||
pMeta->role = NODE_ROLE_LEADER;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||||
taosHashClear(pStartInfo->pFailedTaskSet);
|
taosHashClear(pStartInfo->pFailedTaskSet);
|
||||||
|
|
|
@ -290,10 +290,11 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
|
||||||
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
|
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) {
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) {
|
||||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||||
ASSERT(pInfo != NULL);
|
ASSERT(pInfo != NULL);
|
||||||
|
|
||||||
|
*oldStage = pInfo->stage;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
if (stage == -1) {
|
if (stage == -1) {
|
||||||
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
|
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
|
||||||
|
@ -459,9 +460,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||||
stError(
|
stError(
|
||||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
|
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", "
|
||||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||||
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
||||||
} else {
|
} else {
|
||||||
stError(
|
stError(
|
||||||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||||
|
@ -476,7 +477,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
|
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
||||||
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
|
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
|
||||||
}
|
}
|
||||||
|
@ -926,7 +927,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp*
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
|
@ -941,7 +942,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -112,7 +112,7 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
|
||||||
int32_t retId = -1;
|
int32_t retId = -1;
|
||||||
int64_t avail = 0;
|
int64_t avail = 0;
|
||||||
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
|
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
|
||||||
#if 0 // round-robin
|
#if 1 // round-robin
|
||||||
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
|
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
|
||||||
STfsDisk *pDisk = pTier->disks[diskId];
|
STfsDisk *pDisk = pTier->disks[diskId];
|
||||||
|
|
||||||
|
|
|
@ -110,6 +110,7 @@ int32_t metaDebugFlag = 131;
|
||||||
int32_t udfDebugFlag = 131;
|
int32_t udfDebugFlag = 131;
|
||||||
int32_t smaDebugFlag = 131;
|
int32_t smaDebugFlag = 131;
|
||||||
int32_t idxDebugFlag = 131;
|
int32_t idxDebugFlag = 131;
|
||||||
|
int32_t sndDebugFlag = 131;
|
||||||
|
|
||||||
int64_t dbgEmptyW = 0;
|
int64_t dbgEmptyW = 0;
|
||||||
int64_t dbgWN = 0;
|
int64_t dbgWN = 0;
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
||||||
|
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
|
||||||
|
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
||||||
|
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
||||||
|
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||||
|
|
|
@ -55,7 +55,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 2, 0)
|
tdSql.checkData(0, 2, 0)
|
||||||
|
|
||||||
tdSql.query("show dnode 1 variables like '%debugFlag'")
|
tdSql.query("show dnode 1 variables like '%debugFlag'")
|
||||||
tdSql.checkRows(22)
|
tdSql.checkRows(23)
|
||||||
|
|
||||||
tdSql.query("show dnode 1 variables like '____debugFlag'")
|
tdSql.query("show dnode 1 variables like '____debugFlag'")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
|
@ -105,7 +105,6 @@ class TDTestCase:
|
||||||
|
|
||||||
topicNameList = ['topic1']
|
topicNameList = ['topic1']
|
||||||
# expectRowsList = []
|
# expectRowsList = []
|
||||||
tmqCom.initConsumerTable("cdb", self.replicaVar)
|
|
||||||
|
|
||||||
tdLog.info("create topics from stb with filter")
|
tdLog.info("create topics from stb with filter")
|
||||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
@ -133,14 +132,15 @@ class TDTestCase:
|
||||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_vnodes")
|
tdSql.query("balance vgroup leader")
|
||||||
# tdLog.debug(tdSql.queryResult)
|
# tdSql.query("select * from information_schema.ins_vnodes")
|
||||||
tdDnodes = cluster.dnodes
|
# # tdLog.debug(tdSql.queryResult)
|
||||||
for result in tdSql.queryResult:
|
# tdDnodes = cluster.dnodes
|
||||||
if result[2] == 'dbt' and result[3] == 'leader':
|
# for result in tdSql.queryResult:
|
||||||
tdLog.debug("leader is %d"%(result[0] - 1))
|
# if result[2] == 'dbt' and result[3] == 'leader':
|
||||||
tdDnodes[result[0] - 1].stoptaosd()
|
# tdLog.debug("leader is %d"%(result[0] - 1))
|
||||||
break
|
# tdDnodes[result[0] - 1].stoptaosd()
|
||||||
|
# break
|
||||||
|
|
||||||
pInsertThread.join()
|
pInsertThread.join()
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
|
@ -159,7 +159,6 @@ class TDTestCase:
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.tmqCase1()
|
self.tmqCase1()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from util.cluster import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'checkpointInterval': 1100}
|
||||||
|
print("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
|
||||||
|
def case1(self):
|
||||||
|
tdLog.debug("========case1 start========")
|
||||||
|
|
||||||
|
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &")
|
||||||
|
time.sleep(4)
|
||||||
|
tdSql.query("use test")
|
||||||
|
tdSql.query("create snode on dnode 4")
|
||||||
|
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||||
|
tdLog.debug("========create stream useing snode and insert data ok========")
|
||||||
|
time.sleep(4)
|
||||||
|
|
||||||
|
tdDnodes = cluster.dnodes
|
||||||
|
tdDnodes[3].stoptaosd()
|
||||||
|
time.sleep(2)
|
||||||
|
tdDnodes[3].starttaosd()
|
||||||
|
tdLog.debug("========snode restart ok========")
|
||||||
|
|
||||||
|
time.sleep(30)
|
||||||
|
os.system("kill -9 `pgrep taosBenchmark`")
|
||||||
|
tdLog.debug("========stop insert ok========")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||||
|
rowCnt = tdSql.getRows()
|
||||||
|
results = []
|
||||||
|
for i in range(rowCnt):
|
||||||
|
results.append(tdSql.getData(i,1))
|
||||||
|
|
||||||
|
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||||
|
tdSql.checkRows(rowCnt)
|
||||||
|
for i in range(rowCnt):
|
||||||
|
data1 = tdSql.getData(i,1)
|
||||||
|
data2 = results[i]
|
||||||
|
if data1 != data2:
|
||||||
|
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||||
|
tdLog.exit("check data error!")
|
||||||
|
|
||||||
|
# tdLog.debug("========sleep 500s========")
|
||||||
|
# time.sleep(500)
|
||||||
|
|
||||||
|
tdLog.debug("case1 end")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.case1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,78 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from util.cluster import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
# updatecfgDict = {'checkpointInterval': 5}
|
||||||
|
# print("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
|
||||||
|
def case1(self):
|
||||||
|
tdLog.debug("========case1 start========")
|
||||||
|
|
||||||
|
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &")
|
||||||
|
time.sleep(4)
|
||||||
|
tdSql.query("use test")
|
||||||
|
tdSql.query("create snode on dnode 4")
|
||||||
|
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||||
|
tdLog.debug("========create stream useing snode and insert data ok========")
|
||||||
|
time.sleep(60)
|
||||||
|
|
||||||
|
tdDnodes = cluster.dnodes
|
||||||
|
tdDnodes[3].stoptaosd()
|
||||||
|
time.sleep(2)
|
||||||
|
tdDnodes[3].starttaosd()
|
||||||
|
tdLog.debug("========snode restart ok========")
|
||||||
|
|
||||||
|
time.sleep(30)
|
||||||
|
os.system("kill -9 `pgrep taosBenchmark`")
|
||||||
|
tdLog.debug("========stop insert ok========")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||||
|
rowCnt = tdSql.getRows()
|
||||||
|
results = []
|
||||||
|
for i in range(rowCnt):
|
||||||
|
results.append(tdSql.getData(i,1))
|
||||||
|
|
||||||
|
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||||
|
tdSql.checkRows(rowCnt)
|
||||||
|
for i in range(rowCnt):
|
||||||
|
data1 = tdSql.getData(i,1)
|
||||||
|
data2 = results[i]
|
||||||
|
if data1 != data2:
|
||||||
|
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||||
|
tdLog.exit("check data error!")
|
||||||
|
|
||||||
|
# tdLog.debug("========sleep 500s========")
|
||||||
|
# time.sleep(500)
|
||||||
|
|
||||||
|
tdLog.debug("case1 end")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.case1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,77 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from util.cluster import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'checkpointInterval': 1100}
|
||||||
|
print("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
|
||||||
|
def case1(self):
|
||||||
|
tdLog.debug("========case1 start========")
|
||||||
|
|
||||||
|
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &")
|
||||||
|
time.sleep(4)
|
||||||
|
tdSql.query("use test")
|
||||||
|
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||||
|
tdLog.debug("========create stream useing snode and insert data ok========")
|
||||||
|
time.sleep(4)
|
||||||
|
|
||||||
|
tdDnodes = cluster.dnodes
|
||||||
|
tdDnodes[2].stoptaosd()
|
||||||
|
time.sleep(2)
|
||||||
|
tdDnodes[2].starttaosd()
|
||||||
|
tdLog.debug("========vnode restart ok========")
|
||||||
|
|
||||||
|
time.sleep(30)
|
||||||
|
os.system("kill -9 `pgrep taosBenchmark`")
|
||||||
|
tdLog.debug("========stop insert ok========")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||||
|
rowCnt = tdSql.getRows()
|
||||||
|
results = []
|
||||||
|
for i in range(rowCnt):
|
||||||
|
results.append(tdSql.getData(i,1))
|
||||||
|
|
||||||
|
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||||
|
tdSql.checkRows(rowCnt)
|
||||||
|
for i in range(rowCnt):
|
||||||
|
data1 = tdSql.getData(i,1)
|
||||||
|
data2 = results[i]
|
||||||
|
if data1 != data2:
|
||||||
|
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||||
|
tdLog.exit("check data error!")
|
||||||
|
|
||||||
|
# tdLog.debug("========sleep 500s========")
|
||||||
|
# time.sleep(500)
|
||||||
|
|
||||||
|
tdLog.debug("case1 end")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.case1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -582,7 +582,7 @@ if __name__ == "__main__":
|
||||||
tdDnodes.setAsan(asan)
|
tdDnodes.setAsan(asan)
|
||||||
tdDnodes.stopAll()
|
tdDnodes.stopAll()
|
||||||
for dnode in tdDnodes.dnodes:
|
for dnode in tdDnodes.dnodes:
|
||||||
tdDnodes.deploy(dnode.index,{})
|
tdDnodes.deploy(dnode.index,updateCfgDict)
|
||||||
for dnode in tdDnodes.dnodes:
|
for dnode in tdDnodes.dnodes:
|
||||||
tdDnodes.starttaosd(dnode.index)
|
tdDnodes.starttaosd(dnode.index)
|
||||||
tdCases.logSql(logSql)
|
tdCases.logSql(logSql)
|
||||||
|
|
Loading…
Reference in New Issue