refactor(tmq): use tdb to store check info
This commit is contained in:
parent
27e2ba79c0
commit
357b21cfeb
|
@ -2555,10 +2555,14 @@ typedef struct {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int64_t ntbUid;
|
int64_t ntbUid;
|
||||||
SArray* colIdList; // SArray<int16_t>
|
SArray* colIdList; // SArray<int16_t>
|
||||||
} SCheckAlterInfo;
|
} STqCheckInfo;
|
||||||
|
|
||||||
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
|
int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
|
||||||
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
|
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
} STqDelCheckInfoReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -188,7 +188,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||||
|
|
|
@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
void streamMetaClose(SStreamMeta* streamMeta);
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
|
|
||||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
|
|
||||||
|
|
|
@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
|
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
|
||||||
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
||||||
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
||||||
|
@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
|
||||||
tEndDecode(decoder);
|
tEndDecode(decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
|
||||||
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
||||||
snprintf(buf, maxLen, "offset(reset to none)");
|
snprintf(buf, maxLen, "offset(reset to none)");
|
||||||
|
@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
if (pLeft->type == pRight->type) {
|
if (pLeft->type == pRight->type) {
|
||||||
|
@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
|
int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
|
||||||
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
||||||
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
||||||
|
@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
|
int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
|
||||||
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
|
|
|
@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -15,10 +15,10 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndOffset.h"
|
#include "mndOffset.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
|
@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
sdbRelease(pSdb, pOffset);
|
sdbRelease(pSdb, pOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
||||||
|
|
|
@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
||||||
|
@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (topicObj.ntbUid != 0) {
|
if (topicObj.ntbUid != 0) {
|
||||||
SCheckAlterInfo info;
|
STqCheckInfo info;
|
||||||
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
||||||
info.ntbUid = topicObj.ntbUid;
|
info.ntbUid = topicObj.ntbUid;
|
||||||
info.colIdList = topicObj.ntbColIds;
|
info.colIdList = topicObj.ntbColIds;
|
||||||
|
@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
// encoder check alter info
|
// encoder check alter info
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
|
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = buf;
|
action.pCont = buf;
|
||||||
action.contLen = sizeof(SMsgHead) + len;
|
action.contLen = sizeof(SMsgHead) + len;
|
||||||
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
|
action.msgType = TDMT_VND_ADD_CHECK_INFO;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// TODO check if rebalancing
|
// TODO check if rebalancing
|
||||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
|
@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTopic->ntbUid != 0) {
|
||||||
|
// broadcast to all vnode
|
||||||
|
void *pIter = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
|
||||||
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||||
|
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||||
|
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
|
|
|
@ -117,16 +117,15 @@ typedef struct {
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
SHashObj* pPushMgr; // consumerId -> STqHandle*
|
||||||
SHashObj* handles; // subKey -> STqHandle
|
SHashObj* pHandle; // subKey -> STqHandle
|
||||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
||||||
|
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
|
|
||||||
TDB* pMetaStore;
|
TDB* pMetaDB;
|
||||||
TTB* pExecStore;
|
TTB* pExecStore;
|
||||||
|
TTB* pCheckStore;
|
||||||
TTB* pAlterInfoStore;
|
|
||||||
|
|
||||||
SStreamMeta* pStreamMeta;
|
SStreamMeta* pStreamMeta;
|
||||||
};
|
};
|
||||||
|
@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
||||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq);
|
int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||||
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||||
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||||
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t size;
|
int32_t size;
|
||||||
|
|
|
@ -155,13 +155,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
|
// tq-mq
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
|
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
// tq-stream
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
|
|
|
@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
|
|
||||||
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
if (tqMetaOpen(pTq) < 0) {
|
if (tqMetaOpen(pTq) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
void tqClose(STQ* pTq) {
|
void tqClose(STQ* pTq) {
|
||||||
if (pTq) {
|
if (pTq) {
|
||||||
tqOffsetClose(pTq->pOffsetStore);
|
tqOffsetClose(pTq->pOffsetStore);
|
||||||
taosHashCleanup(pTq->handles);
|
taosHashCleanup(pTq->pHandle);
|
||||||
taosHashCleanup(pTq->pushMgr);
|
taosHashCleanup(pTq->pPushMgr);
|
||||||
taosHashCleanup(pTq->pAlterInfo);
|
taosHashCleanup(pTq->pCheckInfo);
|
||||||
taosMemoryFree(pTq->path);
|
taosMemoryFree(pTq->path);
|
||||||
tqMetaClose(pTq);
|
tqMetaClose(pTq);
|
||||||
streamMetaClose(pTq->pStreamMeta);
|
streamMetaClose(pTq->pStreamMeta);
|
||||||
|
@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
|
pLeft->val.version <= pRight->val.version;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, msg, msgLen);
|
tDecoderInit(&decoder, msg, msgLen);
|
||||||
|
@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
||||||
TD_VID(pTq->pVnode), offset.val.version);
|
TD_VID(pTq->pVnode), offset.val.version);
|
||||||
|
if (offset.val.version + 1 == version) {
|
||||||
|
offset.val.version += 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||||
/*if (pOffset != NULL) {*/
|
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
|
||||||
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rsp
|
||||||
|
|
||||||
/*}*/
|
/*}*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
|
@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pAlterInfo, pIter);
|
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
|
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
|
||||||
if (pCheck->ntbUid == tbUid) {
|
if (pCheck->ntbUid == tbUid) {
|
||||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
||||||
if (forbidColId == colId) {
|
if (forbidColId == colId) {
|
||||||
taosHashCancelIterate(pTq->pAlterInfo, pIter);
|
taosHashCancelIterate(pTq->pCheckInfo, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SWalCkHead* pCkHead = NULL;
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
|
||||||
// 1.find handle
|
// 1.find handle
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
/*ASSERT(pHandle);*/
|
/*ASSERT(pHandle);*/
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||||
|
@ -478,10 +490,10 @@ OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||||
|
@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SCheckAlterInfo info = {0};
|
STqCheckInfo info = {0};
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, msg, msgLen);
|
tDecoderInit(&decoder, msg, msgLen);
|
||||||
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
|
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
|
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
|
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
// todo lock
|
// todo lock
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
|
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
|
||||||
|
@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -668,34 +696,9 @@ FAIL:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
//
|
//
|
||||||
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
|
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||||
#if 0
|
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
|
||||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
if (tqExpandTask(pTq, pTask) < 0) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
FAIL:
|
|
||||||
if (pTask) taosMemoryFree(pTask);
|
|
||||||
return -1;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||||
|
@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
|
||||||
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||||
|
|
|
@ -43,6 +43,185 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaOpen(STQ* pTq) {
|
||||||
|
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaClose(STQ* pTq) {
|
||||||
|
if (pTq->pExecStore) {
|
||||||
|
tdbTbClose(pTq->pExecStore);
|
||||||
|
}
|
||||||
|
if (pTq->pCheckStore) {
|
||||||
|
tdbTbClose(pTq->pCheckStore);
|
||||||
|
}
|
||||||
|
tdbClose(pTq->pMetaDB);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
|
||||||
|
TXN txn;
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
|
TBC* pCur = NULL;
|
||||||
|
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pKey = NULL;
|
||||||
|
int kLen = 0;
|
||||||
|
void* pVal = NULL;
|
||||||
|
int vLen = 0;
|
||||||
|
SDecoder decoder;
|
||||||
|
|
||||||
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
|
STqCheckInfo info;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
|
int32_t code;
|
||||||
|
int32_t vlen;
|
||||||
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
||||||
|
TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, buf, vlen);
|
||||||
|
|
||||||
|
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||||
|
@ -93,101 +272,10 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||||
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMetaOpen(STQ* pTq) {
|
|
||||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaClose(STQ* pTq) {
|
|
||||||
if (pTq->pExecStore) {
|
|
||||||
tdbTbClose(pTq->pExecStore);
|
|
||||||
}
|
|
||||||
tdbClose(pTq->pMetaStore);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
|
||||||
int32_t code;
|
|
||||||
int32_t vlen;
|
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
|
||||||
TD_VID(pTq->pVnode));
|
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, buf, vlen);
|
|
||||||
|
|
||||||
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
TXN txn;
|
|
||||||
|
|
||||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|
||||||
TXN txn;
|
|
||||||
|
|
||||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->handles, pIter);
|
pIter = taosHashIterate(pTq->pHandle, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
STqHandle* pExec = (STqHandle*)pIter;
|
STqHandle* pExec = (STqHandle*)pIter;
|
||||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
|
|
@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
ASSERT(0);
|
tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||||
} else {
|
} else {
|
||||||
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
break;
|
break;
|
||||||
/* TQ */
|
/* TQ */
|
||||||
case TDMT_VND_MQ_VG_CHANGE:
|
case TDMT_VND_MQ_VG_CHANGE:
|
||||||
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_VG_DELETE:
|
case TDMT_VND_MQ_VG_DELETE:
|
||||||
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_COMMIT_OFFSET:
|
case TDMT_VND_MQ_COMMIT_OFFSET:
|
||||||
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead), version) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_CHECK_ALTER_INFO:
|
case TDMT_VND_ADD_CHECK_INFO:
|
||||||
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TDMT_VND_DELETE_CHECK_INFO:
|
||||||
|
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_DROP: {
|
case TDMT_STREAM_TASK_DROP: {
|
||||||
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue