diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 22cedcd571..a837dd9708 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2291,6 +2291,10 @@ typedef struct { // TODO: other info needed by task } SStreamTaskExecReq; +typedef struct { + int32_t reserved; +} SStreamTaskExecRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b30a325d7c..6a07887721 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -200,6 +200,7 @@ enum { // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/util/tencode.h b/include/util/tencode.h index c058eebb50..cbbd55c8a3 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -402,10 +402,33 @@ static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val) { return 0; } +static FORCE_INLINE int32_t tDecodeBinaryAlloc(SCoder* pDecoder, void** val, uint64_t* len) { + if (tDecodeU64v(pDecoder, len) < 0) return -1; + + if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1; + *val = malloc(*len); + if (*val == NULL) return -1; + memcpy(*val, TD_CODER_CURRENT(pDecoder), *len); + + TD_CODER_MOVE_POS(pDecoder, *len); + return 0; +} + +static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SCoder* pDecoder, char** val, uint64_t* len) { + if (tDecodeBinaryAlloc(pDecoder, (void**)val, len) < 0) return -1; + (*len) -= 1; + return 0; +} + +static FORCE_INLINE int32_t tDecodeCStrAlloc(SCoder* pDecoder, char** val) { + uint64_t len; + return tDecodeCStrAndLenAlloc(pDecoder, val, &len); +} + static FORCE_INLINE bool tDecodeIsEnd(SCoder* pCoder) { return (pCoder->size == pCoder->pos); } #ifdef __cplusplus } #endif -#endif /*_TD_UTIL_ENCODE_H_*/ \ No newline at end of file +#endif /*_TD_UTIL_ENCODE_H_*/ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 469090c05b..794e5a8542 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2674,9 +2674,9 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; - if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1; - if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1; - if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1; + if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; + if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1; + if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -2706,7 +2706,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeCStr(pDecoder, (const char **)&pTask->qmsg) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; tEndDecode(pDecoder); return 0; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6e8d9aa79f..cc5bf843ce 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -39,8 +39,8 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1; - if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1; - if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; return 0; } diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index e5f6c3c266..d1122fc4ec 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -50,8 +50,9 @@ typedef struct SSnode { SStreamMeta* sndMetaNew(); void sndMetaDelete(SStreamMeta* pMeta); -int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask); +SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId); +int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 80e33bd971..2ecaeb00e9 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -68,6 +68,10 @@ int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } +SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) { + return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); +} + int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); if (pTask == NULL) { @@ -79,6 +83,16 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t)); } +static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + int32_t taskId = pHead->streamTaskId; + SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); + if (pTask == NULL) { + return -1; + } + return 0; +} + int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deploy // stream stop/resume @@ -95,13 +109,20 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { tCoderClear(&decoder); sndMetaDeployTask(pSnode->pMeta, pTask); + } else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { + sndProcessTaskExecReq(pSnode, pMsg); } else { - // + ASSERT(0); } return 0; } int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { // operator exec + if (pMsg->msgType == TDMT_SND_TASK_EXEC) { + sndProcessTaskExecReq(pSnode, pMsg); + } else { + ASSERT(0); + } return 0; }