Merge branch '3.0' into enh/TD-30554-3.0

This commit is contained in:
kailixu 2024-06-12 15:40:01 +08:00
commit e7754a0b3e
33 changed files with 765 additions and 323 deletions

View File

@ -3495,8 +3495,6 @@ typedef struct SVUpdateCheckpointInfoReq {
int64_t checkpointTs;
int32_t transId;
int8_t dropRelHTask;
int64_t hStreamId;
int64_t hTaskId;
} SVUpdateCheckpointInfoReq;
typedef struct {
@ -3649,10 +3647,6 @@ typedef struct {
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
typedef struct {
int8_t reserved;
} SVPauseStreamTaskRsp;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;

View File

@ -225,9 +225,9 @@
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
@ -390,6 +390,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)

View File

@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);

View File

@ -190,6 +190,20 @@ typedef struct SCheckpointTriggerRsp {
int32_t rspCode;
} SCheckpointTriggerRsp;
typedef struct SCheckpointReport {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
int64_t checkpointId;
int64_t checkpointVer;
int64_t checkpointTs;
int32_t transId;
int8_t dropHTask;
} SCheckpointReport;
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq);
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq);
typedef struct {
SMsgHead head;
int64_t streamId;

View File

@ -734,6 +734,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta);
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
@ -762,8 +765,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode);
int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId,
SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();

View File

@ -232,6 +232,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -240,6 +241,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -75,26 +75,27 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -967,6 +967,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -26,13 +26,14 @@ extern "C" {
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 5
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update"
typedef struct SStreamTransInfo {
int64_t startTime;
@ -51,6 +52,7 @@ typedef struct SStreamTransMgmt {
} SStreamTransMgmt;
typedef struct SStreamExecInfo {
bool initTaskList;
SArray *pNodeList;
int64_t ts; // snapshot ts
SStreamTransMgmt transMgmt;
@ -58,6 +60,7 @@ typedef struct SStreamExecInfo {
SArray *pTaskList;
TdThreadMutex lock;
SHashObj *pTransferStateStreams;
SHashObj *pChkptStreams;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
@ -78,7 +81,18 @@ typedef struct SOrphanTask {
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg;
} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
typedef struct STaskChkptInfo {
int32_t nodeId;
int32_t taskId;
int64_t streamId;
int64_t checkpointId;
int64_t version;
int64_t ts;
int32_t transId;
int8_t dropHTask;
}STaskChkptInfo;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
@ -96,7 +110,7 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
int32_t retryCode, int32_t acceptCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
@ -114,14 +128,19 @@ int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *p
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo);
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
#ifdef __cplusplus
}

View File

@ -56,13 +56,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -103,6 +104,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
// for msgs inside mnode
// TODO change the name
@ -114,8 +116,10 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
@ -131,9 +135,11 @@ int32_t mndInitStream(SMnode *pMnode) {
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
}
if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) {
return -1;
}
return 0;
}
@ -143,6 +149,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.pTransferStateStreams);
taosHashCleanup(execInfo.pChkptStreams);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup");
}
@ -508,7 +515,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0);
int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0);
if (code != 0) {
taosMemoryFree(buf);
return -1;
@ -952,7 +959,7 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return -1;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
if (code != 0) {
taosMemoryFree(buf);
}
@ -1088,7 +1095,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
}
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
bool ready = true;
if (taskNodeIsUpdated(pMnode)) {
return -1;
}
@ -1099,6 +1106,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0);
}
SArray* pInvalidList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
@ -1106,11 +1115,20 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
continue;
}
if (pEntry->status == TASK_STATUS__STOP) {
for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
STaskId* pId = taosArrayGet(pInvalidList, j);
if (pEntry->id.streamId == pId->streamId) {
taosArrayPush(pInvalidList, &pEntry->id);
break;
}
}
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
if (pEntry->hTaskId != 0) {
@ -1123,6 +1141,9 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
}
}
removeTasksInBuf(pInvalidList, &execInfo);
taosArrayDestroy(pInvalidList);
taosThreadMutexUnlock(&execInfo.lock);
return ready ? 0 : -1;
}
@ -1151,7 +1172,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return code;
terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
return -1;
}
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));
@ -1798,6 +1820,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
taosThreadMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) {
@ -2169,7 +2195,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0;
}
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2215,48 +2241,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
}
return false;
}
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) {
continue;
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemovedTasks, pId);
}
}
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
removeExpiredNodeInfo(pNodeSnapshot);
taosArrayDestroy(pRemovedTasks);
return 0;
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2476,13 +2460,137 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)};
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
return 0;
}
static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) {
bool existed = false;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo* p = taosArrayGet(pList ,i);
if (p->taskId == pReport->taskId) {
existed = true;
break;
}
}
if (!existed) {
STaskChkptInfo info = {
.streamId = pReport->streamId,
.taskId = pReport->taskId,
.transId = pReport->transId,
.dropHTask = pReport->dropHTask,
.version = pReport->checkpointVer,
.ts = pReport->checkpointTs,
.checkpointId = pReport->checkpointId,
.nodeId = pReport->nodeId,
};
taosArrayPush(pList, &info);
}
}
int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SCheckpointReport req = {0};
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamTaskChkptReport(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task checkpoint-report msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
" checkpointVer:%" PRId64 " transId:%d",
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf",
req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
}
}
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
doAddTaskInfo(pList, &req);
taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskInfo(*pReqTaskList, &req);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" will be issued soon",
req.streamId, pStream->name, total, req.checkpointId);
// if (pStream != NULL) {
// bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
// if (conflict) {
// mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId);
// } else {
// int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList);
// if (code == TSDB_CODE_SUCCESS) { // remove this entry
// taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
//
// int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
// mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId,
// numOfStreams);
// } else {
// mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet",
// req.streamId);
// }
// }
// }
}
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
@ -2510,3 +2618,63 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
}
return code;
}
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pExecInfo->initTaskList || pMnode == NULL) {
return;
}
addAllStreamTasksIntoBuf(pMnode, pExecInfo);
extractNodeListFromStream(pMnode, pExecInfo->pNodeList);
pExecInfo->initTaskList = true;
}
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray* pChkptInfoList) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, "update checkpoint-info");
if (pTrans == NULL) {
return terrno;
}
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
if (code != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}

View File

@ -22,54 +22,7 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pMnode == NULL) {
return;
}
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray *pIdList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t));
if (p != NULL) {
continue;
}
void* pObj = mndGetStreamObj(pMnode, pId->streamId);
if (pObj != NULL) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pIdList, pId);
}
}
removeTasksInBuf(pIdList, &execInfo);
taosArrayDestroy(pIdList);
taosHashCleanup(pHash);
}
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
@ -290,16 +243,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
addAllStreamTasksIntoBuf(pMnode, &execInfo);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreamTasksInBuf(pMnode, &execInfo);
}
extractStreamNodeList(pMnode);
mndInitStreamExecInfo(pMnode, &execInfo);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
@ -326,18 +270,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
// if (!pTaskEntry->inputQChanging) {
// pTaskEntry->inputQUnchangeCounter++;
// } else {
// pTaskEntry->inputQChanging = false;
// }
// } else {
// pTaskEntry->inputQChanging = true;
// pTaskEntry->inputQUnchangeCounter = 0;
// }
streamTaskStatusCopy(pTaskEntry, p);
STaskCkptInfo *pChkInfo = &p->checkpointInfo;
@ -348,6 +280,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedChkpt, &info);
// remove failed trans from pChkptStreams
taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId));
}
}
@ -393,6 +328,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mndDropOrphanTasks(pMnode, pOrphanTasks);
}
if (pMnode != NULL) { // make sure that the unit test case can work
mndStreamStartUpdateCheckpointInfo(pMnode);
}
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);
@ -411,3 +350,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
if (pMsg != NULL) {
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}

View File

@ -127,7 +127,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
return false;
}
int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) {
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
@ -136,12 +136,13 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
}
mndStreamClearFinishedTrans(pMnode, NULL);
SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
taosThreadMutexUnlock(&execInfo.lock);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
return tInfo.transId;
}
} else {
@ -159,7 +160,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo
return NULL;
}
mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id);
mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -246,8 +247,9 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
}
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode};
int32_t retryCode, int32_t acceptCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode,
.acceptableCode = acceptCode};
return mndTransAppendRedoAction(pTrans, &action);
}

View File

@ -230,7 +230,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
return -1;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
@ -308,7 +308,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
@ -356,7 +356,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
@ -400,7 +400,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
@ -484,7 +484,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID);
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
@ -534,7 +534,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
@ -574,9 +574,11 @@ void mndInitExecInfo() {
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
}
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
@ -646,3 +648,171 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
destroyStreamTaskIter(pIter);
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
}
return false;
}
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) {
continue;
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemovedTasks, pId);
}
}
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
removeExpiredNodeInfo(pNodeSnapshot);
taosArrayDestroy(pRemovedTasks);
return 0;
}
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
ASSERT(pReqTaskList);
int32_t size = taosArrayGetSize(*pReqTaskList);
for(int32_t i = 0; i < size; ++i) {
STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i);
if (pInfo->taskId == pTask->id.taskId) {
pReq->checkpointId = pInfo->checkpointId;
pReq->checkpointVer = pInfo->version;
pReq->checkpointTs = pInfo->ts;
pReq->dropRelHTask = pInfo->dropHTask;
pReq->transId = pInfo->transId;
}
}
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
mDebug("start to scan checkpoint report info");
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SArray *pList = *(SArray **)pIter;
STaskChkptInfo* pInfo = taosArrayGet(pList, 0);
SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId);
if (pStream == NULL) {
mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
taosArrayPush(pDropped, &pInfo->streamId);
continue;
}
int32_t total = mndGetNumOfStreamTasks(pStream);
int32_t existed = (int32_t) taosArrayGetSize(pList);
if (total == existed) {
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
pStream->uid, pStream->name, total);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (!conflict) {
int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosArrayPush(pDropped, &pInfo->streamId);
mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId);
} else {
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
pInfo->streamId);
}
break;
} else {
mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId);
}
} else {
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
existed, total, total - existed);
}
sdbRelease(pMnode->pSdb, pStream);
}
int32_t size = taosArrayGetSize(pDropped);
if (size > 0) {
for (int32_t i = 0; i < size; ++i) {
int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i);
taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId));
}
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
}
taosArrayDestroy(pDropped);
return TSDB_CODE_SUCCESS;
}

View File

@ -169,7 +169,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "f1");
char name[TSDB_FUNC_NAME_LEN] = "f1";
taosArrayPush(retrieveReq.pFuncNames, name);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
@ -220,7 +221,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
retrieveReq.numOfFuncs = TSDB_FUNC_MAX_RETRIEVE + 1;
retrieveReq.pFuncNames = taosArrayInit(TSDB_FUNC_MAX_RETRIEVE + 1, TSDB_FUNC_NAME_LEN);
for (int32_t i = 0; i < TSDB_FUNC_MAX_RETRIEVE + 1; ++i) {
taosArrayPush(retrieveReq.pFuncNames, "1");
char name[TSDB_FUNC_NAME_LEN] = "1";
taosArrayPush(retrieveReq.pFuncNames, name);
}
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
@ -237,7 +239,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "f2");
char name[TSDB_FUNC_NAME_LEN] = "f2";
taosArrayPush(retrieveReq.pFuncNames, name);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
@ -279,7 +282,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "f2");
char name[TSDB_FUNC_NAME_LEN] = "f2";
taosArrayPush(retrieveReq.pFuncNames, name);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
@ -316,8 +320,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 2;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "f2");
taosArrayPush(retrieveReq.pFuncNames, "f1");
char name1[TSDB_FUNC_NAME_LEN] = "f2";
taosArrayPush(retrieveReq.pFuncNames, name1);
char name2[TSDB_FUNC_NAME_LEN] = "f1";
taosArrayPush(retrieveReq.pFuncNames, name2);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
@ -367,8 +373,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 2;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "f2");
taosArrayPush(retrieveReq.pFuncNames, "f3");
char name1[TSDB_FUNC_NAME_LEN] = "f2";
taosArrayPush(retrieveReq.pFuncNames, name1);
char name2[TSDB_FUNC_NAME_LEN] = "f3";
taosArrayPush(retrieveReq.pFuncNames, name2);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
@ -483,7 +491,8 @@ TEST_F(MndTestFunc, 05_Actual_code) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, "udf1");
char name[TSDB_FUNC_NAME_LEN] = "udf1";
taosArrayPush(retrieveReq.pFuncNames, name);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);

View File

@ -136,6 +136,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:

View File

@ -261,6 +261,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);

View File

@ -1278,3 +1278,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -185,7 +185,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
SStreamTask* pTask = *ppTask;
const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) {
if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) {
if (req.transId < pMeta->updateInfo.transId) {
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
pMeta->updateInfo.transId, req.transId);
@ -197,10 +197,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} else {
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
streamMetaInitUpdateTaskList(pMeta, req.transId);
}
} else {
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
@ -280,6 +278,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// persist to disk
}
streamMetaClearUpdateTaskList(pMeta);
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;
@ -1078,6 +1078,8 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {return doProcessDummyRspMsg(pMeta, pMsg);}
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;

View File

@ -854,6 +854,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -843,6 +843,7 @@ static int32_t selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
CLONE_NODE_FIELD_EX(pLimit, SLimitNode*);
COPY_CHAR_ARRAY_FIELD(stmtName);
COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(isSubquery);
COPY_SCALAR_FIELD(isEmptyResult);
COPY_SCALAR_FIELD(timeLineResMode);
COPY_SCALAR_FIELD(timeLineFromOrderBy);

View File

@ -963,12 +963,14 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_WHEN_THEN: {
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
destroyExprNode((SExprNode*)pNode);
nodesDestroyNode(pWhenThen->pWhen);
nodesDestroyNode(pWhenThen->pThen);
break;
}
case QUERY_NODE_CASE_WHEN: {
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode;
destroyExprNode((SExprNode*)pNode);
nodesDestroyNode(pCaseWhen->pCase);
nodesDestroyNode(pCaseWhen->pElse);
nodesDestroyList(pCaseWhen->pWhenThenList);

View File

@ -40,26 +40,8 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList);
bool allSend = true;
taosThreadMutexLock(&pActiveInfo->lock);
int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList);
if (numOfRecv < numOfUpstreams) {
stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d",
pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams);
allSend = false;
}
taosThreadMutexUnlock(&pActiveInfo->lock);
return allSend;
}
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) {
int32_t transId) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -436,25 +418,27 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
STaskId hTaskId = {0};
taosThreadMutexLock(&pTask->lock);
if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer);
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64
" transId:%d ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
pReq->transId);
taosThreadMutexUnlock(&pTask->lock);
{ // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
}
streamMetaWLock(pMeta);
}
@ -492,8 +476,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
}
if (pReq->dropRelHTask) {
hTaskId = pTask->hTaskInfo.id;
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, pReq->hTaskId);
pReq->taskId, vgId, hTaskId.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
}
@ -514,9 +499,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)hTaskId.taskId, numOfTasks);
}
streamMetaWLock(pMeta);
@ -703,9 +689,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id;
code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo,
dropRelHTask);
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
@ -770,6 +754,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d",
pTask->id.idStr, vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexLock(&pActiveInfo->lock);

View File

@ -615,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.
stDebug("s-task:%s start to extract data block from inputQ", id);
@ -699,20 +699,15 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput);
}
streamFreeQitem(pInput);
// todo other thread may change the status
streamFreeQitem(pInput);
} else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name);
streamTaskBuildCheckpoint(pTask);
} else {
// todo refactor
} else { // todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
@ -727,6 +722,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem(pInput);
return 0;
}
}

View File

@ -372,6 +372,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
pMeta->updateInfo.transId = -1;
pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -1741,3 +1742,13 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
id, vgId, transId, el);
}
}
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) {
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = -1;
}
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = transId;
}

View File

@ -731,35 +731,50 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code;
}
int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId,
SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
SCheckpointReport req = {.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.nodeId = vgId,
.dropHTask = dropRelHTask,
.transId = pActive->transId,
.checkpointId = pActive->activeId,
.checkpointVer = pCheckpointInfo->processedVer,
.checkpointTs = pCheckpointInfo->startTs};
tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
pReq->dropRelHTask = dropRelHTask;
pReq->hStreamId = pHTaskId->streamId;
pReq->hTaskId = pHTaskId->taskId;
pReq->transId = pCheckpointInfo->pActiveInfo->transId;
pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId;
pReq->checkpointVer = pCheckpointInfo->processedVer;
pReq->checkpointTs = pCheckpointInfo->startTs;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)};
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
} else {
stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
return code;
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {

View File

@ -597,3 +597,31 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointVer) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointTs) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->dropHTask) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointVer) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointTs) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -86,8 +86,9 @@ static void taosDeleteTimer(void *tharg) {
static TdThread timerThread;
static timer_t timerId;
static volatile bool stopTimer = false;
static void *taosProcessAlarmSignal(void *tharg) {
// Block the signal
static void *taosProcessAlarmSignal(void *tharg) {
// Block the signal
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);

View File

@ -66,6 +66,7 @@ typedef struct {
int32_t lines;
int32_t flag;
int32_t openInProgress;
int64_t lastKeepFileSec;
pid_t pid;
char logName[LOG_FILE_NAME_LEN];
SLogBuff *logHandle;
@ -267,19 +268,33 @@ static void taosUnLockLogFile(TdFilePtr pFile) {
}
}
static void taosKeepOldLog(char *oldName) {
if (tsLogKeepDays == 0) return;
static void taosReserveOldLog(char *oldName, char *keepName) {
if (tsLogKeepDays <= 0) {
keepName[0] = 0;
return;
}
int32_t code = 0;
int64_t fileSec = taosGetTimestampSec();
char fileName[LOG_FILE_NAME_LEN + 20];
snprintf(fileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec);
if (tsLogObj.lastKeepFileSec < fileSec) {
tsLogObj.lastKeepFileSec = fileSec;
} else {
fileSec = ++tsLogObj.lastKeepFileSec;
}
snprintf(keepName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec);
if ((code = taosRenameFile(oldName, keepName))) {
keepName[0] = 0;
uError("failed to rename file:%s to %s since %s", oldName, keepName, tstrerror(code));
}
}
(void)taosRenameFile(oldName, fileName);
char compressFileName[LOG_FILE_NAME_LEN + 20];
snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64 ".gz", tsLogObj.logName, fileSec);
if (taosCompressFile(fileName, compressFileName) == 0) {
(void)taosRemoveFile(fileName);
static void taosKeepOldLog(char *oldName) {
if (oldName[0] != 0) {
char compressFileName[LOG_FILE_NAME_LEN + 20];
snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.gz", oldName);
if (taosCompressFile(oldName, compressFileName) == 0) {
(void)taosRemoveFile(oldName);
}
}
if (tsLogKeepDays > 0) {
@ -316,13 +331,13 @@ static OldFileKeeper *taosOpenNewFile() {
tsLogObj.logHandle->pFile = pFile;
tsLogObj.lines = 0;
tsLogObj.openInProgress = 0;
OldFileKeeper* oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper));
OldFileKeeper *oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper));
if (oldFileKeeper == NULL) {
uError("create old log keep info faild! mem is not enough.");
return NULL;
}
oldFileKeeper->pOldFile = pOldFile;
memcpy(oldFileKeeper->keepName, keepName, LOG_FILE_NAME_LEN + 20);
taosReserveOldLog(keepName, oldFileKeeper->keepName);
uInfo(" new log file:%d is opened", tsLogObj.flag);
uInfo("==================================");

View File

@ -1485,6 +1485,7 @@
,,y,script,./test.sh -f tsim/view/view.sim
,,y,script,./test.sh -f tsim/query/cache_last.sim
,,y,script,./test.sh -f tsim/query/const.sim
,,y,script,./test.sh -f tsim/query/nestedJoinView.sim
#develop test

View File

@ -0,0 +1,19 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql use test;
sql CREATE TABLE `resource_info` ( job_id_ts TIMESTAMP , role VARCHAR(20) primary key, start_time TIMESTAMP, ip VARCHAR(15), cpu FLOAT, memory FLOAT, io_write FLOAT, io_read FLOAT, net_write FLOAT, net_read FLOAT) TAGS ( end_time TIMESTAMP);
sql CREATE STABLE `test_results` ( `job_id_ts` TIMESTAMP , `end_time` VARCHAR(40) PRIMARY KEY, `job_id` BIGINT, `time_cost` FLOAT, `write_speed` FLOAT, `qps` FLOAT, `min_delay` FLOAT, `p90_delay` FLOAT, `p95_delay` FLOAT, `p99_delay` FLOAT, `max_delay` FLOAT, `avg_delay` FLOAT, `hostname` VARCHAR(15), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `load_type` VARCHAR(50), `cpu` FLOAT, `memory` FLOAT, `io_write` FLOAT, `io_read` FLOAT) TAGS ( `branch` VARCHAR(50), `scenario` VARCHAR(50), `test_case` VARCHAR(1000), `env_id` INT, `type` VARCHAR(50));
sql CREATE TABLE `job_info` ( `start_time` TIMESTAMP , `finish_time` TIMESTAMP , `job_id` INT, `job_status` VARCHAR(20), `test_type` VARCHAR(50), `environment` INT, `version` VARCHAR(20), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `type` VARCHAR(50), `scenario` VARCHAR(50), `note` VARCHAR(500), `version_number` VARCHAR(20));
sql create view abc as select * from ( select a.job_id, a.start_time as job_start_time, a.finish_time as job_end_time, a.job_status, a.test_type, a.environment, case when a.version_number <> null then a.version else CONCAT(a.version,'_',a.version_number) end as version_info, a.tdengine_commit_id, a.tdinternal_commit_id, a.type, a.scenario, a.note, a.version_number, b.end_time as tc_end_time, b.time_cost, b.write_speed, b.qps, b.min_delay, b.p90_delay, b.p95_delay, b.p99_delay, b.`max_delay`, b.avg_delay, b.hostname, b.load_type, b.scenario, b.test_case, b.type from job_info a, test_results b where a.start_time=b.job_id_ts and a.job_status='finished') s1 inner join resource_info s2 on s1.job_start_time=s2.job_id_ts and s1.job_id=2 and s1.tc_end_time=s2.end_time;
sql select * from abc;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,66 +1,80 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "localhost",
"port": 6030,
"rest_port": 6041,
"user": "root",
"password": "taosdata",
"thread_count": 100,
"create_table_thread_count": 24,
"result_file": "taosBenchmark_result.log",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 1000000,
"max_sql_len": 1024000,
"databases": [
{
"dbinfo": {
"name": "db4096",
"drop": "yes",
"replica": 1,
"duration": 10,
"precision": "ms",
"keep": 3650,
"comp": 2,
"vgroups": 2,
"buffer": 1000
},
"super_tables": [
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "localhost",
"port": 6030,
"rest_port": 6041,
"user": "root",
"password": "taosdata",
"thread_count": 100,
"create_table_thread_count": 24,
"result_file": "taosBenchmark_result.log",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 1000000,
"max_sql_len": 1024000,
"databases": [
{
"name": "stb0",
"child_table_exists": "no",
"childtable_count":2,
"childtable_prefix": "ctb0",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 500,
"data_source": "rand",
"insert_mode": "taosc",
"rollup": null,
"interlace_rows": 0,
"line_protocol": null,
"tcp_transfer": "no",
"insert_rows": 10,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2022-10-22 17:20:36",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{ "type": "INT","count": 4093}],
"tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}]
"dbinfo": {
"name": "db4096",
"drop": "yes",
"replica": 1,
"duration": 10,
"precision": "ms",
"keep": 3650,
"comp": 2,
"vgroups": 2,
"buffer": 1000
},
"super_tables": [
{
"name": "stb0",
"child_table_exists": "no",
"childtable_count": 2,
"childtable_prefix": "ctb0",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 500,
"data_source": "rand",
"insert_mode": "taosc",
"rollup": null,
"interlace_rows": 0,
"line_protocol": null,
"tcp_transfer": "no",
"insert_rows": 10,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2022-10-22 17:20:36",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [
{
"type": "INT",
"count": 4093
}
],
"tags": [
{
"type": "TINYINT",
"count": 1
},
{
"type": "NCHAR",
"count": 1
}
]
}
]
}
]
}
],
"prepare_rand": 10000,
"chinese": "no",
"streams": false,
"test_log": "/root/testlog/"
],
"prepare_rand": 10000,
"chinese": "no",
"streams": false,
"test_log": "/root/testlog/"
}

View File

@ -1270,15 +1270,14 @@ class TDTestCase:
def test_drop_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
self.create_tsma('tsma1', 'test', 'meters', [
'avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters')
# drop recursive tsma first
tdSql.error('drop tsma test.tsma1', -2147482491)
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1319,7 +1318,7 @@ class TDTestCase:
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database nsdb')
# drop norm table
@ -1346,7 +1345,7 @@ class TDTestCase:
# test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1449,7 +1448,7 @@ class TDTestCase:
tdSql.error(
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self):

View File

@ -42,7 +42,7 @@ class TDTestCase:
tdSql.execute("use test", queryTimes=100)
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream and insert data ok========")
time.sleep(15)
time.sleep(20)
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
rowCnt = tdSql.getRows()
@ -68,7 +68,7 @@ class TDTestCase:
# create stream
tdSql.execute("use db", queryTimes=100)
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
time.sleep(5)
time.sleep(10)
sql = "select count(*) from sta"
# loop wait max 60s to check count is ok