fix:core dump if consumer multi vgroup message with sequence:create/alter/delete/create or create/delete/create
This commit is contained in:
parent
2b442621e8
commit
0857ab3531
|
@ -2397,7 +2397,7 @@ void tmq_free_json_meta(char* jsonMeta){
|
||||||
taosMemoryFreeClear(jsonMeta);
|
taosMemoryFreeClear(jsonMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
|
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen, bool isCreate){
|
||||||
SVCreateStbReq req = {0};
|
SVCreateStbReq req = {0};
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
SMCreateStbReq pReq = {0};
|
SMCreateStbReq pReq = {0};
|
||||||
|
@ -2436,8 +2436,13 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
|
||||||
strcpy(field.name, pSchema->name);
|
strcpy(field.name, pSchema->name);
|
||||||
taosArrayPush(pReq.pTags, &field);
|
taosArrayPush(pReq.pTags, &field);
|
||||||
}
|
}
|
||||||
|
if(isCreate){
|
||||||
|
pReq.colVer = 1;
|
||||||
|
pReq.tagVer = 1;
|
||||||
|
}else{
|
||||||
pReq.colVer = req.schemaRow.version;
|
pReq.colVer = req.schemaRow.version;
|
||||||
pReq.tagVer = req.schemaTag.version;
|
pReq.tagVer = req.schemaTag.version;
|
||||||
|
}
|
||||||
pReq.numOfColumns = req.schemaRow.nCols;
|
pReq.numOfColumns = req.schemaRow.nCols;
|
||||||
pReq.numOfTags = req.schemaTag.nCols;
|
pReq.numOfTags = req.schemaTag.nCols;
|
||||||
pReq.commentLen = -1;
|
pReq.commentLen = -1;
|
||||||
|
@ -2871,9 +2876,9 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){
|
||||||
}
|
}
|
||||||
|
|
||||||
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
|
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
|
||||||
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len, true);
|
||||||
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){
|
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){
|
||||||
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len, false);
|
||||||
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){
|
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){
|
||||||
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||||
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){
|
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){
|
||||||
|
|
|
@ -710,8 +710,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pDst->updateTime = pDst->createdTime;
|
pDst->updateTime = pDst->createdTime;
|
||||||
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
pDst->dbUid = pDb->uid;
|
pDst->dbUid = pDb->uid;
|
||||||
pDst->tagVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->tagVer : 1;
|
pDst->tagVer = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->tagVer : 1;
|
||||||
pDst->colVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->colVer : 1;
|
pDst->colVer = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->colVer : 1;
|
||||||
pDst->smaVer = 1;
|
pDst->smaVer = 1;
|
||||||
pDst->nextColId = 1;
|
pDst->nextColId = 1;
|
||||||
pDst->maxdelay[0] = pCreate->delay1;
|
pDst->maxdelay[0] = pCreate->delay1;
|
||||||
|
@ -981,6 +981,14 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){
|
||||||
|
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||||
|
code = 0;
|
||||||
|
goto _OVER;
|
||||||
|
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer == 1 || createReq.colVer == 1)){ //metaSaveToSkmDb does not delete pMeta->pSkmDb, if receivet tmq message is: create stable1 then delete stable1 then create stable1
|
||||||
|
mInfo("stb:%s, create table from taosx", createReq.name);
|
||||||
|
createReq.tagVer++;
|
||||||
|
createReq.colVer++;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDb = mndAcquireDbByStb(pMnode, createReq.name);
|
pDb = mndAcquireDbByStb(pMnode, createReq.name);
|
||||||
|
|
|
@ -3685,6 +3685,9 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
|
||||||
pReq->delay2 = pStmt->pOptions->maxDelay2;
|
pReq->delay2 = pStmt->pOptions->maxDelay2;
|
||||||
pReq->watermark1 = pStmt->pOptions->watermark1;
|
pReq->watermark1 = pStmt->pOptions->watermark1;
|
||||||
pReq->watermark2 = pStmt->pOptions->watermark2;
|
pReq->watermark2 = pStmt->pOptions->watermark2;
|
||||||
|
pReq->colVer = 1;
|
||||||
|
pReq->tagVer = 1;
|
||||||
|
pReq->source = TD_REQ_FROM_APP;
|
||||||
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
|
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
|
||||||
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
||||||
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
|
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||||
|
|
Loading…
Reference in New Issue