From 0857ab35319c5aa7e53338be2be58fdf4123afd6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Jul 2022 11:39:32 +0800 Subject: [PATCH] fix:core dump if consumer multi vgroup message with sequence:create/alter/delete/create or create/delete/create --- source/client/src/tmq.c | 15 ++++++++++----- source/dnode/mnode/impl/src/mndStb.c | 12 ++++++++++-- source/libs/parser/src/parTranslater.c | 3 +++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c5cf6eecdf..9d4eeb3233 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -2397,7 +2397,7 @@ void tmq_free_json_meta(char* jsonMeta){ taosMemoryFreeClear(jsonMeta); } -static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ +static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen, bool isCreate){ SVCreateStbReq req = {0}; SDecoder coder; SMCreateStbReq pReq = {0}; @@ -2436,8 +2436,13 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ strcpy(field.name, pSchema->name); taosArrayPush(pReq.pTags, &field); } - pReq.colVer = req.schemaRow.version; - pReq.tagVer = req.schemaTag.version; + if(isCreate){ + pReq.colVer = 1; + pReq.tagVer = 1; + }else{ + pReq.colVer = req.schemaRow.version; + pReq.tagVer = req.schemaTag.version; + } pReq.numOfColumns = req.schemaRow.nCols; pReq.numOfTags = req.schemaTag.nCols; pReq.commentLen = -1; @@ -2871,9 +2876,9 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ } if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) { - return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); + return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len, true); }else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){ - return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); + return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len, false); }else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){ return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); }else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){ diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 16143619c1..e170a2916f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -710,8 +710,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->updateTime = pDst->createdTime; pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; - pDst->tagVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->tagVer : 1; - pDst->colVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->colVer : 1; + pDst->tagVer = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->tagVer : 1; + pDst->colVer = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->colVer : 1; pDst->smaVer = 1; pDst->nextColId = 1; pDst->maxdelay[0] = pCreate->delay1; @@ -981,6 +981,14 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { goto _OVER; + } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){ + mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); + code = 0; + goto _OVER; + } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer == 1 || createReq.colVer == 1)){ //metaSaveToSkmDb does not delete pMeta->pSkmDb, if receivet tmq message is: create stable1 then delete stable1 then create stable1 + mInfo("stb:%s, create table from taosx", createReq.name); + createReq.tagVer++; + createReq.colVer++; } pDb = mndAcquireDbByStb(pMnode, createReq.name); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ffacafb504..d78e7d1ba2 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3685,6 +3685,9 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; + pReq->colVer = 1; + pReq->tagVer = 1; + pReq->source = TD_REQ_FROM_APP; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);