fix(stream): fix error in check vgId.

This commit is contained in:
Haojun Liao 2024-01-22 11:05:42 +08:00
parent 4a5ab10b3d
commit 2cc584ff44
3 changed files with 29 additions and 12 deletions

View File

@ -84,8 +84,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -17,6 +17,8 @@
#include "mndDef.h"
#include "mndConsumer.h"
static void *freeStreamTasks(SArray *pTaskLevel);
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
@ -121,11 +123,18 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL;
if (pObj->tasks != NULL) {
pObj->tasks = freeStreamTasks(pObj->tasks);
}
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (tDecodeI32(pDecoder, &sz) < 0) {
return -1;
}
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
@ -165,8 +174,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
static void *freeStreamTasks(SArray *pTaskLevel) {
void *freeStreamTasks(SArray *pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel);

View File

@ -223,11 +223,12 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto STREAM_DECODE_OVER;
}
@ -242,13 +243,19 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if (pRow == NULL) goto STREAM_DECODE_OVER;
pStream = sdbGetRowObj(pRow);
if (pStream == NULL) goto STREAM_DECODE_OVER;
if (pStream == NULL) {
goto STREAM_DECODE_OVER;
}
int32_t tlen;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) goto STREAM_DECODE_OVER;
if (buf == NULL) {
goto STREAM_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SDecoder decoder;
@ -264,13 +271,13 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw,
terrstr());
char* p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
pStream->checkpointId);
return pRow;
}
@ -1120,7 +1127,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
}
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) {
mError("failed to prepare trans rebalance since %s", terrstr());
mError("failed to prepare checkpoint trans since %s", terrstr());
goto _ERR;
}