update table meta after alter table

This commit is contained in:
dapan1121 2022-06-02 12:34:35 +08:00
parent d8093c91c2
commit a10643a074
17 changed files with 291 additions and 163 deletions

View File

@ -1135,6 +1135,10 @@ typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp);
int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp);
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);

View File

@ -331,8 +331,8 @@ typedef struct SQuery {
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
SArray* pDbList;
bool showRewrite;
int32_t placeholderNum;
SArray* pPlaceholderValues;

View File

@ -56,6 +56,11 @@ typedef struct STableComInfo {
int32_t rowSize; // row size of the schema
} STableComInfo;
typedef struct SQueryExecRes {
int32_t msgType;
void* res;
} SQueryExecRes;
typedef struct SIndexMeta {
#ifdef WINDOWS
size_t avoidCompilationErrors;
@ -192,6 +197,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STabl
char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
void destroyQueryExecRes(SQueryExecRes* pRes);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t));
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
@ -204,7 +210,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code == TSDB_CODE_PAR_VALUE_TOO_LONG))
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code == TSDB_CODE_PAR_VALUE_TOO_LONG) || \
(_code == TSDB_CODE_PAR_INVALID_DROP_COL))
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)

View File

@ -56,7 +56,7 @@ typedef struct SQueryProfileSummary {
typedef struct SQueryResult {
int32_t code;
uint64_t numOfRows;
void *res;
SQueryExecRes res;
} SQueryResult;
typedef struct STaskInfo {

View File

@ -162,7 +162,7 @@ typedef struct SResultColumn {
} SResultColumn;
typedef struct SReqResultInfo {
void* pExecRes;
SQueryExecRes execRes;
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields; // todo, column names are not needed.

View File

@ -212,31 +212,6 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
}
}
static void destroyExecRes(SRequestObj* pRequest) {
if (NULL == pRequest || NULL == pRequest->body.resInfo.pExecRes) {
return;
}
switch (pRequest->type) {
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp *)pRequest->body.resInfo.pExecRes);
taosMemoryFree(pRequest->body.resInfo.pExecRes);
break;
}
case TDMT_VND_SUBMIT: {
tFreeSSubmitRsp((SSubmitRsp*)pRequest->body.resInfo.pExecRes);
break;
}
case TDMT_VND_QUERY: {
taosArrayDestroy((SArray*)pRequest->body.resInfo.pExecRes);
break;
}
default:
tscError("invalid exec result for request type %d", pRequest->type);
}
}
static void doDestroyRequest(void *p) {
assert(p != NULL);
SRequestObj *pRequest = (SRequestObj *)p;
@ -259,7 +234,7 @@ static void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
destroyExecRes(pRequest);
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
deregisterRequest(pRequest);
taosMemoryFreeClear(pRequest);

View File

@ -340,7 +340,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
pResInfo->precision = precision;
}
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
tsem_init(&schdRspSem, 0, 0);
@ -348,14 +348,15 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
SQueryResult res = {.code = 0, .numOfRows = 0};
int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, schdExecCallback, &res);
pRequest->body.resInfo.execRes = res.res;
while (true) {
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
*pRes = res.res;
pRequest->code = code;
terrno = code;
return pRequest->code;
@ -378,8 +379,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
}
}
*pRes = res.res;
pRequest->code = res.code;
terrno = res.code;
return pRequest->code;
@ -393,14 +392,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
pRequest->body.resInfo.pExecRes = res.res;
pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
pRequest->code = code;
terrno = code;
return pRequest->code;
@ -489,6 +487,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
}
int32_t handleExecRes(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
@ -497,19 +499,20 @@ int32_t handleExecRes(SRequestObj* pRequest) {
}
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
switch (pRequest->type) {
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) {
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
code = handleAlterTbExecRes(pRequest->body.resInfo.pExecRes, pCatalog);
code = handleAlterTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_SUBMIT: {
code = handleSubmitExecRes(pRequest, pRequest->body.resInfo.pExecRes, pCatalog, &epset);
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
case TDMT_VND_QUERY: {
code = handleQueryExecRes(pRequest, pRequest->body.resInfo.pExecRes, pCatalog, &epset);
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
default:
@ -550,17 +553,15 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
qDestroyQuery(pQuery);
}
if (NULL != pRequest->body.resInfo.pExecRes) {
handleExecRes(pRequest);
}
handleExecRes(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
if (res) {
*res = pRequest->body.resInfo.pExecRes;
pRequest->body.resInfo.pExecRes = NULL;
*res = pRequest->body.resInfo.execRes.res;
pRequest->body.resInfo.execRes.res = NULL;
}
return pRequest;

View File

@ -236,8 +236,9 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMAlterStbRsp(&coder, &alterRsp);
tDecoderClear(&coder);
pRequest->body.resInfo.pExecRes = alterRsp.pMeta;
pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
tsem_post(&pRequest->body.rspSem);
return code;

View File

@ -1207,6 +1207,77 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) {
taosRUnLockLatch(&pStb->lock);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pRsp->dbFName, pStb->db);
strcpy(pRsp->tbName, tbName);
strcpy(pRsp->stbName, tbName);
pRsp->dbId = pDb->uid;
pRsp->numOfTags = pStb->numOfTags;
pRsp->numOfColumns = pStb->numOfColumns;
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->sversion = pStb->colVer;
pRsp->tversion = pStb->tagVer;
pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid;
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
taosRUnLockLatch(&pStb->lock);
return 0;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
return -1;
}
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
return code;
}
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) {
int ret;
SEncoder ec = {0};
@ -1221,7 +1292,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
return -1;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, &alterRsp.meta);
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
if (ret) {
tFreeSMAlterStbRsp(&alterRsp);
return ret;
@ -1533,75 +1604,6 @@ static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) {
taosRUnLockLatch(&pStb->lock);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pRsp->dbFName, pStb->db);
strcpy(pRsp->tbName, tbName);
strcpy(pRsp->stbName, tbName);
pRsp->dbId = pDb->uid;
pRsp->numOfTags = pStb->numOfTags;
pRsp->numOfColumns = pStb->numOfColumns;
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->sversion = pStb->colVer;
pRsp->tversion = pStb->tagVer;
pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid;
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
taosRUnLockLatch(&pStb->lock);
return 0;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
return -1;
}
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
return code;
}
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;

View File

@ -61,6 +61,7 @@ typedef struct SInsertParseContext {
SHashObj* pSubTableHashObj; // global
SArray* pVgDataBlocks; // global
SHashObj* pTableNameHashObj; // global
SHashObj* pDbFNameHashObj; // global
int32_t totalNum;
SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb;
@ -972,6 +973,10 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
continue;
}
if (TK_NK_RP == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
}
if (isParseBindParam) {
return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
}
@ -1091,6 +1096,7 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
taosHashCleanup(pCxt->pVgroupsHashObj);
taosHashCleanup(pCxt->pSubTableHashObj);
taosHashCleanup(pCxt->pTableNameHashObj);
taosHashCleanup(pCxt->pDbFNameHashObj);
destroyBlockHashmap(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pVgDataBlocks);
@ -1151,6 +1157,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)));
// USING clause
if (TK_USING == sToken.type) {
@ -1158,8 +1167,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
NEXT_TOKEN(pCxt->pSql, sToken);
autoCreateTbl = true;
} else {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
CHECK_CODE(getTableMeta(pCxt, &name, dbFName));
}
@ -1238,6 +1245,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pTableMeta = NULL,
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb};
@ -1252,7 +1260,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
NULL == context.pTableNameHashObj || NULL == context.pOutput) {
NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@ -1278,6 +1286,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
}
}
if (NULL == (*pQuery)->pDbList) {
(*pQuery)->pDbList = taosArrayInit(taosHashGetSize(context.pDbFNameHashObj), TSDB_DB_FNAME_LEN);
if (NULL == (*pQuery)->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context.pSql, &context.msg);
@ -1290,6 +1305,12 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
taosArrayPush((*pQuery)->pTableList, pTable);
pTable = taosHashIterate(context.pTableNameHashObj, pTable);
}
char* pDb = taosHashIterate(context.pDbFNameHashObj, NULL);
while (NULL != pDb) {
taosArrayPush((*pQuery)->pDbList, pDb);
pDb = taosHashIterate(context.pDbFNameHashObj, pDb);
}
}
destroyInsertParseContext(&context);
return code;

View File

@ -4889,6 +4889,47 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
static int32_t toMsgType(ENodeType type) {
switch (type) {
case QUERY_NODE_CREATE_TABLE_STMT:
return TDMT_VND_CREATE_TABLE;
case QUERY_NODE_ALTER_TABLE_STMT:
return TDMT_VND_ALTER_TABLE;
case QUERY_NODE_DROP_TABLE_STMT:
return TDMT_VND_DROP_TABLE;
default:
break;
}
return TDMT_VND_CREATE_TABLE;
}
static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery) {
if (NULL != pCxt->pDbs) {
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
if (NULL == pQuery->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL);
while (NULL != pDb) {
taosArrayPush(pQuery->pDbList, pDb->fullDbName);
pDb = taosHashIterate(pCxt->pDbs, pDb);
}
}
if (NULL != pCxt->pTables) {
pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName));
if (NULL == pQuery->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName* pTable = taosHashIterate(pCxt->pTables, NULL);
while (NULL != pTable) {
taosArrayPush(pQuery->pTableList, pTable);
pTable = taosHashIterate(pCxt->pTables, pTable);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
switch (nodeType(pQuery->pRoot)) {
case QUERY_NODE_SELECT_STMT:
@ -4900,7 +4941,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_CREATE_TABLE;
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
break;
case QUERY_NODE_DESCRIBE_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
@ -4928,30 +4969,6 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
}
}
if (NULL != pCxt->pDbs) {
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
if (NULL == pQuery->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL);
while (NULL != pDb) {
taosArrayPush(pQuery->pDbList, pDb->fullDbName);
pDb = taosHashIterate(pCxt->pDbs, pDb);
}
}
if (NULL != pCxt->pTables) {
pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName));
if (NULL == pQuery->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName* pTable = taosHashIterate(pCxt->pTables, NULL);
while (NULL != pTable) {
taosArrayPush(pQuery->pTableList, pTable);
pTable = taosHashIterate(pCxt->pTables, pTable);
}
}
return TSDB_CODE_SUCCESS;
}
@ -4971,6 +4988,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery);
}
setRefreshMate(&cxt, pQuery);
destroyTranslateContext(&cxt);
return code;
}

View File

@ -199,3 +199,30 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
void destroyQueryExecRes(SQueryExecRes* pRes) {
if (NULL == pRes || NULL == pRes->res) {
return;
}
switch (pRes->msgType) {
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp *)pRes->res);
taosMemoryFreeClear(pRes->res);
break;
}
case TDMT_VND_SUBMIT: {
tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
break;
}
case TDMT_VND_QUERY: {
taosArrayDestroy((SArray*)pRes->res);
break;
}
default:
qError("invalid exec result for request type %d", pRes->msgType);
}
}

View File

@ -204,7 +204,7 @@ typedef struct SSchJob {
SSchTask *fetchTask;
int32_t errCode;
SRWLatch resLock;
void *queryRes;
SQueryExecRes execRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;

View File

@ -773,8 +773,8 @@ _return:
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->queryRes;
pJob->queryRes = NULL;
pRes->res = pJob->execRes;
pJob->execRes.res = NULL;
return TSDB_CODE_SUCCESS;
}
@ -1107,9 +1107,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) {
if (NULL == pJob->queryRes) {
pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->queryRes) {
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->execRes.res) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
@ -1119,7 +1119,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
tbInfo.sversion = rsp->sversion;
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->queryRes, &tbInfo);
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
pJob->execRes.msgType = TDMT_VND_QUERY;
}
return TSDB_CODE_SUCCESS;
@ -1349,11 +1350,7 @@ void schFreeJobImpl(void *job) {
qExplainFreeCtx(pJob->explainCtx);
if (SCH_IS_QUERY_JOB(pJob)) {
taosArrayDestroy((SArray *)pJob->queryRes);
} else {
tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes);
}
destroyQueryExecRes(&pJob->execRes);
taosMemoryFreeClear(pJob->userRes.queryRes);
taosMemoryFreeClear(pJob->resData);

View File

@ -163,7 +163,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(code);
SCH_ERR_JRET(rsp.code);
pJob->queryRes = rsp.pMeta;
pJob->execRes.res = rsp.pMeta;
pJob->execRes.msgType = TDMT_VND_ALTER_TABLE;
}
SCH_ERR_JRET(rspCode);
@ -206,8 +207,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->queryRes) {
SSubmitRsp *sum = pJob->queryRes;
if (pJob->execRes.res) {
SSubmitRsp *sum = pJob->execRes.res;
sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
@ -215,7 +216,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp);
} else {
pJob->queryRes = rsp;
pJob->execRes.res = rsp;
pJob->execRes.msgType = TDMT_VND_SUBMIT;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
}

View File

@ -134,4 +134,7 @@
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim
# --- catalog
./test.sh -f tsim/catalog/alterInCurrent.sim
#======================b1-end===============

View File

@ -0,0 +1,70 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== drop column in normal table
sql drop database if exists db1;
sql create database db1;
sql use db1;
sql create table t1 (ts timestamp, f1 int, f2 int);
sql insert into t1 values (1591060628000, 1, 2);
sql alter table t1 drop column f2;
sql insert into t1 values (1591060628001, 2);
print ======== add column in normal table
sql drop database db1;
sql create database db1;
sql use db1;
sql create table t1 (ts timestamp, f1 int);
sql insert into t1 values (1591060628000, 1);
sql alter table t1 add column f2 int;
sql insert into t1 values (1591060628001, 2, 2);
print ======== drop column in super table
sql drop database db1;
sql create database db1;
sql use db1;
sql create stable st1 (ts timestamp, f1 int, f2 int) tags (t1 int);
sql create table t1 using st1 tags(1);
sql insert into t1 values (1591060628000, 1, 2);
sql alter table st1 drop column f2;
sql insert into t1 values (1591060628001, 2);
print ======== add column in super table
sql drop database db1;
sql create database db1;
sql use db1;
sql create stable st1 (ts timestamp, f1 int) tags (t1 int);
sql create table t1 using st1 tags(1);
sql insert into t1 values (1591060628000, 1);
sql alter table st1 add column f2 int;
sql insert into t1 values (1591060628001, 2, 2);
print ======== add tag in super table
sql drop database db1;
sql create database db1;
sql use db1;
sql create stable st1 (ts timestamp, f1 int) tags (t1 int);
sql create table t1 using st1 tags(1);
sql insert into t1 values (1591060628000, 1);
sql alter table st1 add tag t2 int;
sql create table t2 using st1 tags(2, 2);
print ======== drop tag in super table
sql drop database db1;
sql create database db1;
sql use db1;
sql create stable st1 (ts timestamp, f1 int) tags (t1 int, t2 int);
sql create table t1 using st1 tags(1, 1);
sql insert into t1 values (1591060628000, 1);
sql alter table st1 drop tag t2;
sql create table t2 using st1 tags(2);
system sh/exec.sh -n dnode1 -s stop -x SIGINT