Merge branch '3.0' into merge/mainto3.0

This commit is contained in:
Simon Guan 2025-03-04 11:32:29 +08:00
commit b74b038f4f
20 changed files with 168 additions and 8 deletions

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG 3.0
GIT_TAG main
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -17,7 +17,6 @@
#define TDENGINE_STREAMMSG_H
#include "tmsg.h"
//#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@ -256,6 +255,14 @@ typedef struct {
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);
typedef struct {
SMsgHead head;
int64_t streamId;
} SStreamTaskStopReq;
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq);
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq);
#ifdef __cplusplus
}
#endif

View File

@ -401,6 +401,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_ALL_STOP, "vnd-stream-allstop", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8

View File

@ -39,6 +39,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* msg);
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);

View File

@ -866,3 +866,28 @@ int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
_exit:
return code;
}
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
tEndDecode(pDecoder);
_exit:
return code;
}

View File

@ -258,6 +258,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -96,6 +96,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -1014,6 +1014,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;

View File

@ -151,6 +151,8 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask *p
int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger);
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,

View File

@ -1714,6 +1714,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
#endif
TAOS_CHECK_GOTO(mndDropSmasByDb(pMnode, pTrans, pDb), NULL, _OVER);
TAOS_CHECK_GOTO(mndDropIdxsByDb(pMnode, pTrans, pDb), NULL, _OVER);
TAOS_CHECK_GOTO(mndStreamSetStopStreamTasksActions(pMnode, pTrans, pDb->uid), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetDropDbRedoActions(pMnode, pTrans, pDb), NULL, _OVER);
TAOS_CHECK_GOTO(mndUserRemoveDb(pMnode, pTrans, pDb->name), NULL, _OVER);

View File

@ -116,6 +116,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq);

View File

@ -666,3 +666,75 @@ int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pS
return 0;
}
static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) {
void *pBuf = NULL;
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamTaskStopReq req = {.streamId = -1};
tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int32_t tlen = sizeof(SMsgHead) + len;
pBuf = taosMemoryMalloc(tlen);
if (pBuf == NULL) {
return terrno;
}
void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeStreamTaskStopReq(&encoder, &req);
if (code == -1) {
tEncoderClear(&encoder);
taosMemoryFree(pBuf);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)pBuf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pVgObj->vgId);
tEncoderClear(&encoder);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create stop all streams trans, code:%s", tstrerror(code));
taosMemoryFree(pBuf);
}
return 0;
}
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == dbUid) {
if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
TAOS_RETURN(code);
}
}
sdbRelease(pSdb, pVgroup);
}
TAOS_RETURN(code);
return 0;
}

View File

@ -160,6 +160,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
case TDMT_VND_STREAM_ALL_STOP:
return tqStreamTaskProcessAllTaskStopReq(pSnode->pMeta, &pSnode->msgCb, pMsg);
case TDMT_STREAM_TASK_PAUSE:
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_RESUME:

View File

@ -244,12 +244,14 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
void tqScanWalAsync(STQ* pTq);
int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -1380,6 +1380,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
}
int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessAllTaskStopReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg);
}
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);

View File

@ -216,10 +216,8 @@ void tqScanWalAsync(STQ* pTq) {
}
}
int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {

View File

@ -1005,6 +1005,39 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
SStreamTaskStopReq req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
// stop all stream tasks, only invoked when trying to drop db
if (req.streamId <= 0) {
tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
code = streamMetaStopAllTasks(pMeta);
if (code) {
tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
}
} else { // stop only one stream tasks
}
// always return success
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq req = {0};
SStreamTask* pTask = NULL;

View File

@ -750,6 +750,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
} break;
case TDMT_VND_STREAM_ALL_STOP: {
if (pVnode->restored && vnodeIsLeader(pVnode) && (code = tqProcessAllTaskStopReq(pVnode->pTq, pMsg)) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;

View File

@ -624,7 +624,7 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq, false);
if (tqStopStreamTasksAsync(pVnode->pTq) != 0) {
if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) {
vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
}
}

View File

@ -409,7 +409,7 @@
(void)tqProcessTaskConsenChkptIdReq
(void)tqProcessTaskResetReq
(void)tqScanWalAsync
(void)tqStopStreamTasksAsync
(void)tqStopStreamAllTasksAsync
(void)tqUpdateTbUidList
(void)transAcquireExHandle
(void)transAsyncSend