diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index d0f88a660c..b37001bde9 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -175,11 +175,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_CANCEL_CONN, "vnode-mq-mv-cancel-conn", SMqCancelConnReq, SMqCancelConnRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 411eff5374..159e14a3d5 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -218,9 +218,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE); // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 65180f84dd..4cc1b8527c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -276,10 +276,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2250aab35c..6e004a89fc 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -83,7 +83,6 @@ typedef struct STqOffsetStore STqOffsetStore; struct STqReadHandle { int64_t ver; - int64_t tbUid; SHashObj* tbIdHash; const SSubmitReq* pMsg; SSubmitBlk* pBlock; @@ -218,20 +217,6 @@ typedef struct { SArray* topics; // SArray } STqConsumer; -enum { - TQ_PUSHER_TYPE__CLIENT = 1, - TQ_PUSHER_TYPE__STREAM, -}; - -typedef struct { - int8_t type; - int8_t reserved[3]; - int32_t ttl; - int64_t consumerId; - SRpcMsg* pMsg; - // SMqPollRsp* rsp; -} STqClientPusher; - typedef struct { int8_t type; int8_t nodeType; @@ -241,10 +226,6 @@ typedef struct { // TODO sync function } STqStreamPusher; -typedef struct { - SHashObj* pHash; // -} STqPushMgr; - typedef struct { int8_t inited; tmr_h timer; @@ -260,14 +241,11 @@ void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); -// int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -// int32_t tqProcessRebReq(STQ* pTq, char* msg); -// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); @@ -308,16 +286,6 @@ int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetPersistAll(STqOffsetStore* pStore); -// tqPush -int32_t tqPushMgrInit(); -void tqPushMgrCleanUp(); - -STqPushMgr* tqPushMgrOpen(); -void tqPushMgrClose(STqPushMgr* pushMgr); - -STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl); -STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d0018d2cac..0b3feb5010 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,9 +15,12 @@ #include "vnodeInt.h" -int32_t tqInit() { return tqPushMgrInit(); } +int32_t tqInit() { + // + return 0; +} -void tqCleanUp() { tqPushMgrCleanUp(); } +void tqCleanUp() {} STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); @@ -184,7 +187,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ return 0; } -int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) { +int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; void* data = taosMemoryMalloc(msgLen); @@ -194,7 +197,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi memcpy(data, msg, msgLen); if (msgType == TDMT_VND_SUBMIT) { - if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, version) != 0) { + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { return -1; } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 4384255f89..f2f48bbc8a 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -12,73 +12,3 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - -#include "vnodeInt.h" - -int32_t tqPushMgrInit() { - // - int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 0, 1); - if (old == 1) return 0; - - tqPushMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); - return 0; -} - -void tqPushMgrCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 1, 0); - if (old == 0) return; - taosTmrStop(tqPushMgmt.timer); - taosTmrCleanUp(tqPushMgmt.timer); -} - -STqPushMgr* tqPushMgrOpen() { - STqPushMgr* mgr = taosMemoryMalloc(sizeof(STqPushMgr)); - if (mgr == NULL) { - return NULL; - } - mgr->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - return mgr; -} - -void tqPushMgrClose(STqPushMgr* pushMgr) { - taosHashCleanup(pushMgr->pHash); - taosMemoryFree(pushMgr); -} - -STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl) { - STqClientPusher* clientPusher = taosMemoryMalloc(sizeof(STqClientPusher)); - if (clientPusher == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - clientPusher->type = TQ_PUSHER_TYPE__CLIENT; - clientPusher->pMsg = pMsg; - clientPusher->consumerId = consumerId; - clientPusher->ttl = ttl; - if (taosHashPut(pushMgr->pHash, &consumerId, sizeof(int64_t), &clientPusher, sizeof(void*)) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(clientPusher); - // TODO send rsp back - return NULL; - } - return clientPusher; -} - -STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet) { - STqStreamPusher* streamPusher = taosMemoryMalloc(sizeof(STqStreamPusher)); - if (streamPusher == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - streamPusher->type = TQ_PUSHER_TYPE__STREAM; - streamPusher->nodeType = 0; - streamPusher->streamId = streamId; - /*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/ - - if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(streamPusher); - return NULL; - } - return streamPusher; -} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e0c37f7eb7..aa90a04478 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -90,21 +90,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg // TODO: handle error } break; -#if 0 - case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // TODO: handle error - } - } break; - case TDMT_VND_MQ_REB: { - if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - } - } break; - case TDMT_VND_MQ_CANCEL_CONN: { - if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - } - } break; -#endif case TDMT_VND_TASK_DEPLOY: { if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) {