fix:open task case & modify mqRebVgReq encode/decode style
This commit is contained in:
parent
bb772e2062
commit
56782a5d41
|
@ -2807,39 +2807,49 @@ typedef struct {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
} SMqRebVgReq;
|
} SMqRebVgReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
|
static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq* pReq) {
|
||||||
int32_t tlen = 0;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1;
|
||||||
tlen += taosEncodeString(buf, pReq->subKey);
|
if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->subType);
|
if (tEncodeI8(pCoder, pReq->subType) < 0) return -1;
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->withMeta);
|
if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1;
|
||||||
|
|
||||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
|
||||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->suid);
|
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
|
||||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
|
||||||
}
|
}
|
||||||
return tlen;
|
tEndEncode(pCoder);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) {
|
static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq* pReq) {
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
|
||||||
buf = taosDecodeStringTo(buf, pReq->subKey);
|
if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1;
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->subType);
|
if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1;
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->withMeta);
|
if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1;
|
||||||
|
if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1;
|
||||||
|
if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1;
|
||||||
|
|
||||||
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
|
||||||
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->suid);
|
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
|
||||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
if (!tDecodeIsEnd(pCoder)){
|
||||||
|
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return (void*)buf;
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -111,7 +111,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
req.suid = pSub->stbUid;
|
req.suid = pSub->stbUid;
|
||||||
tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
int32_t tlen = 0;
|
||||||
|
int32_t ret = 0;
|
||||||
|
tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen += sizeof(SMsgHead);
|
||||||
void *buf = taosMemoryMalloc(tlen);
|
void *buf = taosMemoryMalloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -123,8 +130,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
pMsgHead->contLen = htonl(tlen);
|
pMsgHead->contLen = htonl(tlen);
|
||||||
pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
|
pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
|
||||||
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
SEncoder encoder = {0};
|
||||||
tEncodeSMqRebVgReq(&abuf, &req);
|
tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
|
||||||
|
if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
|
||||||
|
taosMemoryFreeClear(buf);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
*pBuf = buf;
|
*pBuf = buf;
|
||||||
*pLen = tlen;
|
*pLen = tlen;
|
||||||
|
|
||||||
|
|
|
@ -633,7 +633,16 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
SDecoder dc = {0};
|
||||||
|
|
||||||
|
tDecoderInit(&dc, msg, msgLen);
|
||||||
|
|
||||||
|
// decode req
|
||||||
|
if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SVnode* pVnode = pTq->pVnode;
|
SVnode* pVnode = pTq->pVnode;
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
@ -680,8 +689,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->snapshotVer = ver;
|
pHandle->snapshotVer = ver;
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);;
|
||||||
req.qmsg = NULL;
|
|
||||||
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
|
||||||
&pHandle->execHandle.numOfCols, req.newConsumerId);
|
&pHandle->execHandle.numOfCols, req.newConsumerId);
|
||||||
|
@ -701,8 +709,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
pHandle->execHandle.execTb.qmsg = req.qmsg;
|
pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg);
|
||||||
req.qmsg = NULL;
|
|
||||||
|
|
||||||
if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
||||||
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
||||||
|
@ -766,7 +773,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMemoryFree(req.qmsg);
|
tDecoderClear(&dc);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -564,7 +564,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py
|
||||||
#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
||||||
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
|
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
|
||||||
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
|
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
|
||||||
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
|
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
|
||||||
|
|
Loading…
Reference in New Issue