remove unused msg
This commit is contained in:
parent
b816d7091e
commit
aade540f05
|
@ -175,11 +175,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CANCEL_CONN, "vnode-mq-mv-cancel-conn", SMqCancelConnReq, SMqCancelConnRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
||||||
|
|
|
@ -218,9 +218,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -276,10 +276,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
|
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -83,7 +83,6 @@ typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
||||||
struct STqReadHandle {
|
struct STqReadHandle {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
int64_t tbUid;
|
|
||||||
SHashObj* tbIdHash;
|
SHashObj* tbIdHash;
|
||||||
const SSubmitReq* pMsg;
|
const SSubmitReq* pMsg;
|
||||||
SSubmitBlk* pBlock;
|
SSubmitBlk* pBlock;
|
||||||
|
@ -218,20 +217,6 @@ typedef struct {
|
||||||
SArray* topics; // SArray<STqTopic>
|
SArray* topics; // SArray<STqTopic>
|
||||||
} STqConsumer;
|
} STqConsumer;
|
||||||
|
|
||||||
enum {
|
|
||||||
TQ_PUSHER_TYPE__CLIENT = 1,
|
|
||||||
TQ_PUSHER_TYPE__STREAM,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t type;
|
|
||||||
int8_t reserved[3];
|
|
||||||
int32_t ttl;
|
|
||||||
int64_t consumerId;
|
|
||||||
SRpcMsg* pMsg;
|
|
||||||
// SMqPollRsp* rsp;
|
|
||||||
} STqClientPusher;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int8_t nodeType;
|
int8_t nodeType;
|
||||||
|
@ -241,10 +226,6 @@ typedef struct {
|
||||||
// TODO sync function
|
// TODO sync function
|
||||||
} STqStreamPusher;
|
} STqStreamPusher;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SHashObj* pHash; // <id, STqPush*>
|
|
||||||
} STqPushMgr;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
|
@ -260,14 +241,11 @@ void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
// required by vnode
|
// required by vnode
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
// int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
|
||||||
// int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
|
||||||
// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
|
|
||||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||||
|
@ -308,16 +286,6 @@ int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t
|
||||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||||
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
||||||
|
|
||||||
// tqPush
|
|
||||||
int32_t tqPushMgrInit();
|
|
||||||
void tqPushMgrCleanUp();
|
|
||||||
|
|
||||||
STqPushMgr* tqPushMgrOpen();
|
|
||||||
void tqPushMgrClose(STqPushMgr* pushMgr);
|
|
||||||
|
|
||||||
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl);
|
|
||||||
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,9 +15,12 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
int32_t tqInit() { return tqPushMgrInit(); }
|
int32_t tqInit() {
|
||||||
|
//
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void tqCleanUp() { tqPushMgrCleanUp(); }
|
void tqCleanUp() {}
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||||
|
@ -184,7 +187,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
if (msgType != TDMT_VND_SUBMIT) return 0;
|
||||||
|
|
||||||
void* data = taosMemoryMalloc(msgLen);
|
void* data = taosMemoryMalloc(msgLen);
|
||||||
|
@ -194,7 +197,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
|
||||||
memcpy(data, msg, msgLen);
|
memcpy(data, msg, msgLen);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, version) != 0) {
|
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,73 +12,3 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
int32_t tqPushMgrInit() {
|
|
||||||
//
|
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 0, 1);
|
|
||||||
if (old == 1) return 0;
|
|
||||||
|
|
||||||
tqPushMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tqPushMgrCleanUp() {
|
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 1, 0);
|
|
||||||
if (old == 0) return;
|
|
||||||
taosTmrStop(tqPushMgmt.timer);
|
|
||||||
taosTmrCleanUp(tqPushMgmt.timer);
|
|
||||||
}
|
|
||||||
|
|
||||||
STqPushMgr* tqPushMgrOpen() {
|
|
||||||
STqPushMgr* mgr = taosMemoryMalloc(sizeof(STqPushMgr));
|
|
||||||
if (mgr == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
mgr->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
|
||||||
return mgr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tqPushMgrClose(STqPushMgr* pushMgr) {
|
|
||||||
taosHashCleanup(pushMgr->pHash);
|
|
||||||
taosMemoryFree(pushMgr);
|
|
||||||
}
|
|
||||||
|
|
||||||
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl) {
|
|
||||||
STqClientPusher* clientPusher = taosMemoryMalloc(sizeof(STqClientPusher));
|
|
||||||
if (clientPusher == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
clientPusher->type = TQ_PUSHER_TYPE__CLIENT;
|
|
||||||
clientPusher->pMsg = pMsg;
|
|
||||||
clientPusher->consumerId = consumerId;
|
|
||||||
clientPusher->ttl = ttl;
|
|
||||||
if (taosHashPut(pushMgr->pHash, &consumerId, sizeof(int64_t), &clientPusher, sizeof(void*)) < 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosMemoryFree(clientPusher);
|
|
||||||
// TODO send rsp back
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return clientPusher;
|
|
||||||
}
|
|
||||||
|
|
||||||
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet) {
|
|
||||||
STqStreamPusher* streamPusher = taosMemoryMalloc(sizeof(STqStreamPusher));
|
|
||||||
if (streamPusher == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
streamPusher->type = TQ_PUSHER_TYPE__STREAM;
|
|
||||||
streamPusher->nodeType = 0;
|
|
||||||
streamPusher->streamId = streamId;
|
|
||||||
/*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/
|
|
||||||
|
|
||||||
if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosMemoryFree(streamPusher);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return streamPusher;
|
|
||||||
}
|
|
||||||
|
|
|
@ -90,21 +90,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
#if 0
|
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case TDMT_VND_MQ_REB: {
|
|
||||||
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case TDMT_VND_MQ_CANCEL_CONN: {
|
|
||||||
if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
#endif
|
|
||||||
case TDMT_VND_TASK_DEPLOY: {
|
case TDMT_VND_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue