fix stream serialization

This commit is contained in:
Liu Jicong 2022-03-28 14:16:58 +08:00
parent e0f192046c
commit 90e4cab2af
8 changed files with 38 additions and 20 deletions

View File

@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)

View File

@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);

View File

@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, sz) < 0) return -1; if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i); SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray); int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1; if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }
@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) { if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(SArray)); pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t innerSz; int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask task; SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; if (pTask == NULL) return -1;
taosArrayPush(pArray, &task); if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
taosArrayPush(pArray, &pTask);
} }
taosArrayPush(pObj->tasks, pArray); taosArrayPush(pObj->tasks, &pArray);
} }
} }

View File

@ -198,7 +198,7 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);

View File

@ -497,11 +497,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead));
SStreamTaskExecReq req; SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msgstr, &req); tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));

View File

@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen);
case TDMT_VND_STREAM_TRIGGER: case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:

View File

@ -167,6 +167,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
} break; } break;
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 0 #if 0
SSmaCfg vCreateSmaReq = {0}; SSmaCfg vCreateSmaReq = {0};

View File

@ -205,9 +205,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI8(pEncoder, pTask->smaSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
@ -244,8 +251,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tDecodeI8(pDecoder, &pTask->smaSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {