From 1c40afc67c31676694dd1ec9c9550ec641ec4c6d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 17 Mar 2022 17:48:21 +0800 Subject: [PATCH] add tmq_get_row --- include/client/taos.h | 50 ++++++++------ include/common/tdatablock.h | 23 +++++-- include/common/tmsg.h | 27 +++++++- include/common/tmsgdef.h | 1 + source/client/src/tmq.c | 36 +++++++++- source/dnode/mnode/impl/src/mndDef.c | 34 ++++++++++ source/dnode/mnode/impl/src/mndScheduler.c | 78 ++++++++++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 12 ++-- source/dnode/vnode/inc/tq.h | 1 + source/dnode/vnode/src/inc/tqInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 17 +++++ source/dnode/vnode/src/vnd/vnodeWrite.c | 11 ++- 12 files changed, 252 insertions(+), 39 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 66e3c491c7..a102629efa 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,27 +31,27 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_JSON 15 // json string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_DECIMAL 17 // decimal -#define TSDB_DATA_TYPE_BLOB 18 // binary +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_JSON 15 // json string +#define TSDB_DATA_TYPE_VARBINARY 16 // binary +#define TSDB_DATA_TYPE_DECIMAL 17 // decimal +#define TSDB_DATA_TYPE_BLOB 18 // binary #define TSDB_DATA_TYPE_MEDIUMBLOB 19 -#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string +#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string typedef enum { TSDB_OPTION_LOCALE, @@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co void tmqShowMsg(tmq_message_t *tmq_message); int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); -typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); +/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT* stmt); +DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); +DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); + +/* ---------------------- OTHER ---------------------------- */ +typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); + +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); #ifdef __cplusplus } diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7e60013aa1..17e019d977 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) +static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { + if (!pColumnInfoData->hasNull) { + return false; + } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return pColumnInfoData->varmeta.offset[row] == -1; + } else { + if (pColumnInfoData->nullbitmap == NULL) { + return false; + } + + return colDataIsNull_f(pColumnInfoData->nullbitmap, row); + } +} + static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { if (!pColumnInfoData->hasNull) { @@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u } } -#define BitmapLen(_n) (((_n) + ((1<> NBIT) +#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) - -#define colDataGetData(p1_, r_) \ +// SColumnInfoData, rowNumber +#define colDataGetData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \ : ((p1_)->pData + ((r_) * (p1_)->info.bytes))) @@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock); } #endif -#endif /*_TD_COMMON_EP_H_*/ +#endif /*_TD_COMMON_EP_H_*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b449cad2a1..551035a065 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -419,7 +419,7 @@ typedef struct { }; } SColumnFilterList; /* - * for client side struct, we only need the column id, type, bytes are not necessary + * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. */ typedef struct { @@ -2173,13 +2173,36 @@ typedef struct { SArray* topics; // SArray } SMqCMGetSubEpRsp; -struct tmq_message_t { +typedef struct { SMqRspHead head; union { SMqPollRsp consumeRsp; SMqCMGetSubEpRsp getEpRsp; }; void* extra; +} SMqMsgWrapper; + +typedef struct { + int32_t curBlock; + int32_t curRow; + void** uData; +} SMqRowIter; + +struct tmq_message_t_v1 { + SMqPollRsp rsp; + SMqRowIter iter; +}; + +struct tmq_message_t { + SMqRspHead head; + union { + SMqPollRsp consumeRsp; + SMqCMGetSubEpRsp getEpRsp; + }; + void* extra; + int32_t curBlock; + int32_t curRow; + void** uData; }; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6a07887721..a596794b3d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -189,6 +189,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ddd53be08..ee13527b96 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + pRsp->curBlock = 0; + pRsp->curRow = 0; + // TODO: alloc mem + /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { /*printf("no data\n");*/ @@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { goto END; } - // tmq's epoch is monotomically increase, + // tmq's epoch is monotonically increase, // so it's safe to discard any old epoch msg. - // epoch will only increase when received newer epoch ep msg + // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { @@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) { return "fail"; } +TAOS_ROW tmq_get_row(tmq_message_t* message) { + SMqPollRsp* rsp = &message->consumeRsp; + while (1) { + if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) { + SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock); + if (message->curRow < pBlock->info.rows) { + for (int i = 0; i < pBlock->info.numOfCols; i++) { + SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i); + if (colDataIsNull_s(pData, message->curRow)) + message->uData[i] = NULL; + else { + message->uData[i] = colDataGetData(pData, message->curRow); + } + } + message->curRow++; + return message->uData; + } else { + message->curBlock++; + message->curRow = 0; + continue; + } + } + return NULL; + } +} + +char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } + #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index cc5bf843ce..f81dead325 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; + // TODO encode tasks + if (pObj->tasks) { + int32_t sz = taosArrayGetSize(pObj->tasks); + tEncodeI32(pEncoder, sz); + for (int32_t i = 0; i < sz; i++) { + SArray *pArray = taosArrayGet(pObj->tasks, i); + int32_t innerSz = taosArrayGetSize(pArray); + tEncodeI32(pEncoder, innerSz); + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask *pTask = taosArrayGet(pArray, j); + tEncodeSStreamTask(pEncoder, pTask); + } + } + } else { + tEncodeI32(pEncoder, 0); + } return pEncoder->pos; } @@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + if (sz != 0) { + pObj->tasks = taosArrayInit(sz, sizeof(SArray)); + for (int32_t i = 0; i < sz; i++) { + int32_t innerSz; + if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; + SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask task; + if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; + taosArrayPush(pArray, &task); + } + taosArrayPush(pObj->tasks, pArray); + } + } else { + pObj->tasks = NULL; + } return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 16b1ba8a5c..faef757e76 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { return -1; } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { // duplicatable @@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_SND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } } else { // not duplicatable @@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { return -1; } taosArrayPush(taskOneLevel, pTask); + + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + action.epSet = plan->execNode.epSet; + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_SND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } } taosArrayPush(pStream->tasks, taskOneLevel); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 67011dfe8a..af0f354c11 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -230,6 +230,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); + if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { + mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); + mndTransDrop(pTrans); + return -1; + } + SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); @@ -238,12 +244,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR } sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { - mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); - mndTransDrop(pTrans); - return -1; - } - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index b4d45e83fb..8d8ed2e427 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -55,6 +55,7 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index fc6a3699d5..9b21cf92ce 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -161,6 +161,7 @@ struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; STqPushMgr* tqPushMgr; + SHashObj* pStreamTasks; SWal* pWal; SMeta* pVnodeMeta; }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d15481b4aa..8cdc250e8d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S return NULL; } + pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + return pTq; } @@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; return 0; } + +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { + SStreamTask* pTask = malloc(sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); + tDecodeSStreamTask(&decoder, pTask); + tCoderClear(&decoder); + + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); + + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index aba8100478..218b53c2ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { return 0; } -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { void *ptr = NULL; if (pVnode->config.streamMode == 0) { @@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { switch (pMsg->msgType) { case TDMT_VND_CREATE_STB: { - SVCreateTbReq vCreateTbReq = {0}; + SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { break; } case TDMT_VND_ALTER_STB: { - SVCreateTbReq vAlterTbReq = {0}; + SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); free(vAlterTbReq.stbCfg.pSchema); @@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_TASK_DEPLOY: { + if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + } + } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {