From a10643a074faf771d8fee72a5c8320b7ed1030fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 2 Jun 2022 12:34:35 +0800 Subject: [PATCH] update table meta after alter table --- include/common/tmsg.h | 4 + include/libs/nodes/querynodes.h | 2 +- include/libs/qcom/query.h | 9 +- include/libs/scheduler/scheduler.h | 2 +- source/client/inc/clientInt.h | 2 +- source/client/src/clientEnv.c | 27 +--- source/client/src/clientImpl.c | 35 ++--- source/client/src/clientMsgHandler.c | 5 +- source/dnode/mnode/impl/src/mndStb.c | 142 ++++++++++--------- source/libs/parser/src/parInsert.c | 27 +++- source/libs/parser/src/parTranslater.c | 68 +++++---- source/libs/qcom/src/queryUtil.c | 27 ++++ source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/schJob.c | 19 ++- source/libs/scheduler/src/schRemote.c | 10 +- tests/script/jenkins/basic.txt | 3 + tests/script/tsim/catalog/alterInCurrent.sim | 70 +++++++++ 17 files changed, 291 insertions(+), 163 deletions(-) create mode 100644 tests/script/tsim/catalog/alterInCurrent.sim diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9800c74dd1..ac65e3da8b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1135,6 +1135,10 @@ typedef struct { STableMetaRsp* pMeta; } SMAlterStbRsp; +int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp); +int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp); +void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp); + int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); void tFreeSTableMetaRsp(STableMetaRsp* pRsp); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index ab5e10dc2a..e4af78892b 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -331,8 +331,8 @@ typedef struct SQuery { int8_t precision; SCmdMsgInfo* pCmdMsg; int32_t msgType; - SArray* pDbList; SArray* pTableList; + SArray* pDbList; bool showRewrite; int32_t placeholderNum; SArray* pPlaceholderValues; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 296b18e8de..45a7e9a29f 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -56,6 +56,11 @@ typedef struct STableComInfo { int32_t rowSize; // row size of the schema } STableComInfo; +typedef struct SQueryExecRes { + int32_t msgType; + void* res; +} SQueryExecRes; + typedef struct SIndexMeta { #ifdef WINDOWS size_t avoidCompilationErrors; @@ -192,6 +197,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STabl char* jobTaskStatusStr(int32_t status); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); +void destroyQueryExecRes(SQueryExecRes* pRes); extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); @@ -204,7 +210,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \ (_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \ - (_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code == TSDB_CODE_PAR_VALUE_TOO_LONG)) + (_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code == TSDB_CODE_PAR_VALUE_TOO_LONG) || \ + (_code == TSDB_CODE_PAR_INVALID_DROP_COL)) #define NEED_CLIENT_REFRESH_VG_ERROR(_code) \ ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 0d32cce20b..331b787690 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -56,7 +56,7 @@ typedef struct SQueryProfileSummary { typedef struct SQueryResult { int32_t code; uint64_t numOfRows; - void *res; + SQueryExecRes res; } SQueryResult; typedef struct STaskInfo { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 91f9874b23..ad610603bf 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -162,7 +162,7 @@ typedef struct SResultColumn { } SResultColumn; typedef struct SReqResultInfo { - void* pExecRes; + SQueryExecRes execRes; const char* pRspMsg; const char* pData; TAOS_FIELD* fields; // todo, column names are not needed. diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 777f27d29d..4f5f23b68b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -212,31 +212,6 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) { } } -static void destroyExecRes(SRequestObj* pRequest) { - if (NULL == pRequest || NULL == pRequest->body.resInfo.pExecRes) { - return; - } - - switch (pRequest->type) { - case TDMT_VND_ALTER_TABLE: - case TDMT_MND_ALTER_STB: { - tFreeSTableMetaRsp((STableMetaRsp *)pRequest->body.resInfo.pExecRes); - taosMemoryFree(pRequest->body.resInfo.pExecRes); - break; - } - case TDMT_VND_SUBMIT: { - tFreeSSubmitRsp((SSubmitRsp*)pRequest->body.resInfo.pExecRes); - break; - } - case TDMT_VND_QUERY: { - taosArrayDestroy((SArray*)pRequest->body.resInfo.pExecRes); - break; - } - default: - tscError("invalid exec result for request type %d", pRequest->type); - } -} - static void doDestroyRequest(void *p) { assert(p != NULL); SRequestObj *pRequest = (SRequestObj *)p; @@ -259,7 +234,7 @@ static void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); - destroyExecRes(pRequest); + destroyQueryExecRes(&pRequest->body.resInfo.execRes); deregisterRequest(pRequest); taosMemoryFreeClear(pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 123afa0f7c..729f53f443 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -340,7 +340,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { pResInfo->precision = precision; } -int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { +int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; tsem_init(&schdRspSem, 0, 0); @@ -348,14 +348,15 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod SQueryResult res = {.code = 0, .numOfRows = 0}; int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schdExecCallback, &res); + + pRequest->body.resInfo.execRes = res.res; + while (true) { if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); } - *pRes = res.res; - pRequest->code = code; terrno = code; return pRequest->code; @@ -378,8 +379,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod } } - *pRes = res.res; - pRequest->code = res.code; terrno = res.code; return pRequest->code; @@ -393,14 +392,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); - pRequest->body.resInfo.pExecRes = res.res; + pRequest->body.resInfo.execRes = res.res; if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); } - pRequest->code = code; terrno = code; return pRequest->code; @@ -489,6 +487,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { } int32_t handleExecRes(SRequestObj* pRequest) { + if (NULL == pRequest->body.resInfo.execRes.res) { + return TSDB_CODE_SUCCESS; + } + int32_t code = 0; SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); @@ -497,19 +499,20 @@ int32_t handleExecRes(SRequestObj* pRequest) { } SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - - switch (pRequest->type) { + SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; + + switch (pRes->msgType) { case TDMT_VND_ALTER_TABLE: case TDMT_MND_ALTER_STB: { - code = handleAlterTbExecRes(pRequest->body.resInfo.pExecRes, pCatalog); + code = handleAlterTbExecRes(pRes->res, pCatalog); break; } case TDMT_VND_SUBMIT: { - code = handleSubmitExecRes(pRequest, pRequest->body.resInfo.pExecRes, pCatalog, &epset); + code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset); break; } case TDMT_VND_QUERY: { - code = handleQueryExecRes(pRequest, pRequest->body.resInfo.pExecRes, pCatalog, &epset); + code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset); break; } default: @@ -550,17 +553,15 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code qDestroyQuery(pQuery); } - if (NULL != pRequest->body.resInfo.pExecRes) { - handleExecRes(pRequest); - } + handleExecRes(pRequest); if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } if (res) { - *res = pRequest->body.resInfo.pExecRes; - pRequest->body.resInfo.pExecRes = NULL; + *res = pRequest->body.resInfo.execRes.res; + pRequest->body.resInfo.execRes.res = NULL; } return pRequest; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e48770e2a1..9de3ee1d0f 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -236,8 +236,9 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecodeSMAlterStbRsp(&coder, &alterRsp); tDecoderClear(&coder); - - pRequest->body.resInfo.pExecRes = alterRsp.pMeta; + + pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; + pRequest->body.resInfo.execRes.res = alterRsp.pMeta; tsem_post(&pRequest->body.rspSem); return code; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e691e0341a..aa8789829c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1207,6 +1207,77 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } + +static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) { + taosRLockLatch(&pStb->lock); + + int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; + pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema)); + if (pRsp->pSchemas == NULL) { + taosRUnLockLatch(&pStb->lock); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + strcpy(pRsp->dbFName, pStb->db); + strcpy(pRsp->tbName, tbName); + strcpy(pRsp->stbName, tbName); + pRsp->dbId = pDb->uid; + pRsp->numOfTags = pStb->numOfTags; + pRsp->numOfColumns = pStb->numOfColumns; + pRsp->precision = pDb->cfg.precision; + pRsp->tableType = TSDB_SUPER_TABLE; + pRsp->sversion = pStb->colVer; + pRsp->tversion = pStb->tagVer; + pRsp->suid = pStb->uid; + pRsp->tuid = pStb->uid; + + for (int32_t i = 0; i < pStb->numOfColumns; ++i) { + SSchema *pSchema = &pRsp->pSchemas[i]; + SSchema *pSrcSchema = &pStb->pColumns[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = pSrcSchema->colId; + pSchema->bytes = pSrcSchema->bytes; + } + + for (int32_t i = 0; i < pStb->numOfTags; ++i) { + SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns]; + SSchema *pSrcSchema = &pStb->pTags[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = pSrcSchema->colId; + pSchema->bytes = pSrcSchema->bytes; + } + + taosRUnLockLatch(&pStb->lock); + return 0; +} + +static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); + + SDbObj *pDb = mndAcquireDb(pMnode, dbFName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + SStbObj *pStb = mndAcquireStb(pMnode, tbFName); + if (pStb == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_STB; + return -1; + } + + int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp); + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); + return code; +} + + static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) { int ret; SEncoder ec = {0}; @@ -1221,7 +1292,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S return -1; } - ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, &alterRsp.meta); + ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta); if (ret) { tFreeSMAlterStbRsp(&alterRsp); return ret; @@ -1533,75 +1604,6 @@ static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp) { return 0; } -static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) { - taosRLockLatch(&pStb->lock); - - int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; - pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema)); - if (pRsp->pSchemas == NULL) { - taosRUnLockLatch(&pStb->lock); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - strcpy(pRsp->dbFName, pStb->db); - strcpy(pRsp->tbName, tbName); - strcpy(pRsp->stbName, tbName); - pRsp->dbId = pDb->uid; - pRsp->numOfTags = pStb->numOfTags; - pRsp->numOfColumns = pStb->numOfColumns; - pRsp->precision = pDb->cfg.precision; - pRsp->tableType = TSDB_SUPER_TABLE; - pRsp->sversion = pStb->colVer; - pRsp->tversion = pStb->tagVer; - pRsp->suid = pStb->uid; - pRsp->tuid = pStb->uid; - - for (int32_t i = 0; i < pStb->numOfColumns; ++i) { - SSchema *pSchema = &pRsp->pSchemas[i]; - SSchema *pSrcSchema = &pStb->pColumns[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = pSrcSchema->colId; - pSchema->bytes = pSrcSchema->bytes; - } - - for (int32_t i = 0; i < pStb->numOfTags; ++i) { - SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns]; - SSchema *pSrcSchema = &pStb->pTags[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = pSrcSchema->colId; - pSchema->bytes = pSrcSchema->bytes; - } - - taosRUnLockLatch(&pStb->lock); - return 0; -} - -static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { - char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; - snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); - - SDbObj *pDb = mndAcquireDb(pMnode, dbFName); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; - } - - SStbObj *pStb = mndAcquireStb(pMnode, tbFName); - if (pStb == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_STB; - return -1; - } - - int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp); - mndReleaseDb(pMnode, pDb); - mndReleaseStb(pMnode, pStb); - return code; -} - static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 8a843c2c1a..f5a079318c 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -61,6 +61,7 @@ typedef struct SInsertParseContext { SHashObj* pSubTableHashObj; // global SArray* pVgDataBlocks; // global SHashObj* pTableNameHashObj; // global + SHashObj* pDbFNameHashObj; // global int32_t totalNum; SVnodeModifOpStmt* pOutput; SStmtCallback* pStmtCb; @@ -972,6 +973,10 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, continue; } + if (TK_NK_RP == sToken.type) { + return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); + } + if (isParseBindParam) { return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); } @@ -1091,6 +1096,7 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { taosHashCleanup(pCxt->pVgroupsHashObj); taosHashCleanup(pCxt->pSubTableHashObj); taosHashCleanup(pCxt->pTableNameHashObj); + taosHashCleanup(pCxt->pDbFNameHashObj); destroyBlockHashmap(pCxt->pTableBlockHashObj); destroyBlockArrayList(pCxt->pVgDataBlocks); @@ -1151,6 +1157,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { tNameExtractFullName(&name, tbFName); CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&name, dbFName); + CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName))); // USING clause if (TK_USING == sToken.type) { @@ -1158,8 +1167,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { NEXT_TOKEN(pCxt->pSql, sToken); autoCreateTbl = true; } else { - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(&name, dbFName); CHECK_CODE(getTableMeta(pCxt, &name, dbFName)); } @@ -1238,6 +1245,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { .pTableMeta = NULL, .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), + .pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), .totalNum = 0, .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pStmtCb = pContext->pStmtCb}; @@ -1252,7 +1260,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || - NULL == context.pTableNameHashObj || NULL == context.pOutput) { + NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1278,6 +1286,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { } } + if (NULL == (*pQuery)->pDbList) { + (*pQuery)->pDbList = taosArrayInit(taosHashGetSize(context.pDbFNameHashObj), TSDB_DB_FNAME_LEN); + if (NULL == (*pQuery)->pDbList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + context.pOutput->payloadType = PAYLOAD_TYPE_KV; int32_t code = skipInsertInto(&context.pSql, &context.msg); @@ -1290,6 +1305,12 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { taosArrayPush((*pQuery)->pTableList, pTable); pTable = taosHashIterate(context.pTableNameHashObj, pTable); } + + char* pDb = taosHashIterate(context.pDbFNameHashObj, NULL); + while (NULL != pDb) { + taosArrayPush((*pQuery)->pDbList, pDb); + pDb = taosHashIterate(context.pDbFNameHashObj, pDb); + } } destroyInsertParseContext(&context); return code; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index acbedff0c2..b9b3da0dfe 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4889,6 +4889,47 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { return code; } +static int32_t toMsgType(ENodeType type) { + switch (type) { + case QUERY_NODE_CREATE_TABLE_STMT: + return TDMT_VND_CREATE_TABLE; + case QUERY_NODE_ALTER_TABLE_STMT: + return TDMT_VND_ALTER_TABLE; + case QUERY_NODE_DROP_TABLE_STMT: + return TDMT_VND_DROP_TABLE; + default: + break; + } + return TDMT_VND_CREATE_TABLE; +} + +static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery) { + if (NULL != pCxt->pDbs) { + pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN); + if (NULL == pQuery->pDbList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL); + while (NULL != pDb) { + taosArrayPush(pQuery->pDbList, pDb->fullDbName); + pDb = taosHashIterate(pCxt->pDbs, pDb); + } + } + + if (NULL != pCxt->pTables) { + pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName)); + if (NULL == pQuery->pTableList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SName* pTable = taosHashIterate(pCxt->pTables, NULL); + while (NULL != pTable) { + taosArrayPush(pQuery->pTableList, pTable); + pTable = taosHashIterate(pCxt->pTables, pTable); + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { switch (nodeType(pQuery->pRoot)) { case QUERY_NODE_SELECT_STMT: @@ -4900,7 +4941,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - pQuery->msgType = TDMT_VND_CREATE_TABLE; + pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType); break; case QUERY_NODE_DESCRIBE_STMT: pQuery->execMode = QUERY_EXEC_MODE_LOCAL; @@ -4928,30 +4969,6 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { } } - if (NULL != pCxt->pDbs) { - pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN); - if (NULL == pQuery->pDbList) { - return TSDB_CODE_OUT_OF_MEMORY; - } - SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL); - while (NULL != pDb) { - taosArrayPush(pQuery->pDbList, pDb->fullDbName); - pDb = taosHashIterate(pCxt->pDbs, pDb); - } - } - - if (NULL != pCxt->pTables) { - pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName)); - if (NULL == pQuery->pTableList) { - return TSDB_CODE_OUT_OF_MEMORY; - } - SName* pTable = taosHashIterate(pCxt->pTables, NULL); - while (NULL != pTable) { - taosArrayPush(pQuery->pTableList, pTable); - pTable = taosHashIterate(pCxt->pTables, pTable); - } - } - return TSDB_CODE_SUCCESS; } @@ -4971,6 +4988,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { if (TSDB_CODE_SUCCESS == code) { code = setQuery(&cxt, pQuery); } + setRefreshMate(&cxt, pQuery); destroyTranslateContext(&cxt); return code; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index f4ba2fca81..a5a499aaf5 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -199,3 +199,30 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam tstrncpy(s.name, name, tListLen(s.name)); return s; } + +void destroyQueryExecRes(SQueryExecRes* pRes) { + if (NULL == pRes || NULL == pRes->res) { + return; + } + + switch (pRes->msgType) { + case TDMT_VND_ALTER_TABLE: + case TDMT_MND_ALTER_STB: { + tFreeSTableMetaRsp((STableMetaRsp *)pRes->res); + taosMemoryFreeClear(pRes->res); + break; + } + case TDMT_VND_SUBMIT: { + tFreeSSubmitRsp((SSubmitRsp*)pRes->res); + break; + } + case TDMT_VND_QUERY: { + taosArrayDestroy((SArray*)pRes->res); + break; + } + default: + qError("invalid exec result for request type %d", pRes->msgType); + } +} + + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 09a4f322d4..44b3e6d396 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -204,7 +204,7 @@ typedef struct SSchJob { SSchTask *fetchTask; int32_t errCode; SRWLatch resLock; - void *queryRes; + SQueryExecRes execRes; void *resData; //TODO free it or not int32_t resNumOfRows; SSchResInfo userRes; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 1c60dcccfd..dbad053c65 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -773,8 +773,8 @@ _return: int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; - pRes->res = pJob->queryRes; - pJob->queryRes = NULL; + pRes->res = pJob->execRes; + pJob->execRes.res = NULL; return TSDB_CODE_SUCCESS; } @@ -1107,9 +1107,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { - if (NULL == pJob->queryRes) { - pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); - if (NULL == pJob->queryRes) { + if (NULL == pJob->execRes.res) { + pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); + if (NULL == pJob->execRes.res) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } @@ -1119,7 +1119,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { tbInfo.sversion = rsp->sversion; tbInfo.tversion = rsp->tversion; - taosArrayPush((SArray *)pJob->queryRes, &tbInfo); + taosArrayPush((SArray *)pJob->execRes.res, &tbInfo); + pJob->execRes.msgType = TDMT_VND_QUERY; } return TSDB_CODE_SUCCESS; @@ -1349,11 +1350,7 @@ void schFreeJobImpl(void *job) { qExplainFreeCtx(pJob->explainCtx); - if (SCH_IS_QUERY_JOB(pJob)) { - taosArrayDestroy((SArray *)pJob->queryRes); - } else { - tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); - } + destroyQueryExecRes(&pJob->execRes); taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->resData); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 39ba52145c..0ba91a1c85 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -163,7 +163,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(code); SCH_ERR_JRET(rsp.code); - pJob->queryRes = rsp.pMeta; + pJob->execRes.res = rsp.pMeta; + pJob->execRes.msgType = TDMT_VND_ALTER_TABLE; } SCH_ERR_JRET(rspCode); @@ -206,8 +207,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->queryRes) { - SSubmitRsp *sum = pJob->queryRes; + if (pJob->execRes.res) { + SSubmitRsp *sum = pJob->execRes.res; sum->affectedRows += rsp->affectedRows; sum->nBlocks += rsp->nBlocks; sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); @@ -215,7 +216,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); } else { - pJob->queryRes = rsp; + pJob->execRes.res = rsp; + pJob->execRes.msgType = TDMT_VND_SUBMIT; } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 12b678eeae..9e91a9f2f4 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -134,4 +134,7 @@ ./test.sh -f tsim/sync/oneReplica1VgElect.sim ./test.sh -f tsim/sync/oneReplica5VgElect.sim +# --- catalog +./test.sh -f tsim/catalog/alterInCurrent.sim + #======================b1-end=============== diff --git a/tests/script/tsim/catalog/alterInCurrent.sim b/tests/script/tsim/catalog/alterInCurrent.sim new file mode 100644 index 0000000000..3cb337bbe1 --- /dev/null +++ b/tests/script/tsim/catalog/alterInCurrent.sim @@ -0,0 +1,70 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ========= start dnode1 as LEADER +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== drop column in normal table +sql drop database if exists db1; +sql create database db1; +sql use db1; +sql create table t1 (ts timestamp, f1 int, f2 int); +sql insert into t1 values (1591060628000, 1, 2); +sql alter table t1 drop column f2; +sql insert into t1 values (1591060628001, 2); + +print ======== add column in normal table +sql drop database db1; +sql create database db1; +sql use db1; +sql create table t1 (ts timestamp, f1 int); +sql insert into t1 values (1591060628000, 1); +sql alter table t1 add column f2 int; +sql insert into t1 values (1591060628001, 2, 2); + + +print ======== drop column in super table +sql drop database db1; +sql create database db1; +sql use db1; +sql create stable st1 (ts timestamp, f1 int, f2 int) tags (t1 int); +sql create table t1 using st1 tags(1); +sql insert into t1 values (1591060628000, 1, 2); +sql alter table st1 drop column f2; +sql insert into t1 values (1591060628001, 2); + + +print ======== add column in super table +sql drop database db1; +sql create database db1; +sql use db1; +sql create stable st1 (ts timestamp, f1 int) tags (t1 int); +sql create table t1 using st1 tags(1); +sql insert into t1 values (1591060628000, 1); +sql alter table st1 add column f2 int; +sql insert into t1 values (1591060628001, 2, 2); + + +print ======== add tag in super table +sql drop database db1; +sql create database db1; +sql use db1; +sql create stable st1 (ts timestamp, f1 int) tags (t1 int); +sql create table t1 using st1 tags(1); +sql insert into t1 values (1591060628000, 1); +sql alter table st1 add tag t2 int; +sql create table t2 using st1 tags(2, 2); + + +print ======== drop tag in super table +sql drop database db1; +sql create database db1; +sql use db1; +sql create stable st1 (ts timestamp, f1 int) tags (t1 int, t2 int); +sql create table t1 using st1 tags(1, 1); +sql insert into t1 values (1591060628000, 1); +sql alter table st1 drop tag t2; +sql create table t2 using st1 tags(2); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT