task deploy and task exec
This commit is contained in:
parent
0ff5b63f4c
commit
7f4c88b756
|
@ -2306,6 +2306,10 @@ typedef struct {
|
|||
// TODO: other info needed by task
|
||||
} SStreamTaskExecReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SStreamTaskExecRsp;
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -200,6 +200,7 @@ enum {
|
|||
// Requests handled by SNODE
|
||||
TD_NEW_MSG_SEG(TDMT_SND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
|
||||
|
||||
#if defined(TD_MSG_NUMBER_)
|
||||
TDMT_MAX
|
||||
|
|
|
@ -402,10 +402,33 @@ static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SCoder* pDecoder, void** val, uint64_t* len) {
|
||||
if (tDecodeU64v(pDecoder, len) < 0) return -1;
|
||||
|
||||
if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1;
|
||||
*val = malloc(*len);
|
||||
if (*val == NULL) return -1;
|
||||
memcpy(*val, TD_CODER_CURRENT(pDecoder), *len);
|
||||
|
||||
TD_CODER_MOVE_POS(pDecoder, *len);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SCoder* pDecoder, char** val, uint64_t* len) {
|
||||
if (tDecodeBinaryAlloc(pDecoder, (void**)val, len) < 0) return -1;
|
||||
(*len) -= 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tDecodeCStrAlloc(SCoder* pDecoder, char** val) {
|
||||
uint64_t len;
|
||||
return tDecodeCStrAndLenAlloc(pDecoder, val, &len);
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool tDecodeIsEnd(SCoder* pCoder) { return (pCoder->size == pCoder->pos); }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_ENCODE_H_*/
|
||||
#endif /*_TD_UTIL_ENCODE_H_*/
|
||||
|
|
|
@ -2674,9 +2674,9 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
|
@ -2706,7 +2706,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
|
|||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pTask->qmsg) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -39,8 +39,8 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
|||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -50,8 +50,9 @@ typedef struct SSnode {
|
|||
SStreamMeta* sndMetaNew();
|
||||
void sndMetaDelete(SStreamMeta* pMeta);
|
||||
|
||||
int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
||||
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||
|
||||
|
|
|
@ -68,6 +68,10 @@ int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
|
|||
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
|
||||
}
|
||||
|
||||
SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) {
|
||||
return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
}
|
||||
|
||||
int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
||||
SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
if (pTask == NULL) {
|
||||
|
@ -79,6 +83,16 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
|||
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t taskId = pHead->streamTaskId;
|
||||
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// stream deploy
|
||||
// stream stop/resume
|
||||
|
@ -95,13 +109,20 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
tCoderClear(&decoder);
|
||||
|
||||
sndMetaDeployTask(pSnode->pMeta, pTask);
|
||||
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
|
||||
sndProcessTaskExecReq(pSnode, pMsg);
|
||||
} else {
|
||||
//
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// operator exec
|
||||
if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
|
||||
sndProcessTaskExecReq(pSnode, pMsg);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue