diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a17be44846..a717a6b333 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -224,7 +224,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_SCHEDULER_RETRY_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) -#define REQUEST_MAX_TRY_TIMES 1 +#define REQUEST_TOTAL_EXEC_TIMES 2 #define qFatal(...) \ do { \ diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index eaf18884c6..0630704d93 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -212,6 +212,9 @@ typedef struct SRequestObj { SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; + + uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog + uint32_t retry; } SRequestObj; typedef struct SSyncQueryParam { @@ -263,8 +266,8 @@ extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; -extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); +__async_send_cb_fn_t getMsgRspHandle(int32_t msgType); + SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); @@ -274,7 +277,7 @@ int32_t releaseTscObj(int64_t rid); uint64_t generateRequestId(); -void* createRequest(STscObj* pObj, void* param, int32_t type); +void* createRequest(STscObj* pObj, int32_t type); void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); @@ -290,8 +293,6 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); -void initMsgHandleFp(); - TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port, int connType); @@ -325,6 +326,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); +void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); +int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function +int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index be0a41ba40..75e1884360 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -177,7 +177,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } -void *createRequest(STscObj *pObj, void *param, int32_t type) { +void *createRequest(STscObj *pObj, int32_t type) { assert(pObj != NULL); SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); @@ -191,8 +191,6 @@ void *createRequest(STscObj *pObj, void *param, int32_t type) { pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampUs(); - pRequest->body.param = param; - pRequest->type = type; pRequest->pTscObj = pObj; pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); @@ -280,7 +278,6 @@ void taos_init_imp(void) { return; } - initMsgHandleFp(); initQueryModuleMsgHandle(); rpcInit(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 916017543d..c71c7d3487 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -133,7 +133,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, } int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { - *pRequest = createRequest(pTscObj, NULL, TSDB_SQL_SELECT); + *pRequest = createRequest(pTscObj, TSDB_SQL_SELECT); if (*pRequest == NULL) { tscError("failed to malloc sqlObj"); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -207,6 +207,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false); } + return code; } @@ -235,6 +236,31 @@ static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; } +void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { + SRetrieveTableRsp* pRsp = NULL; + + int32_t code = qExecCommand(pQuery->pRoot, &pRsp); + if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { + code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false); + } + + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + pRequest->code = code; + tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + } else { + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, + pRequest->requestId); + } + + pRequest->body.queryFp(pRequest->body.param, pRequest, 0); +// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); +} + int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { // drop table if exists not_exists_table if (NULL == pQuery->pCmdMsg) { @@ -509,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); } -int32_t handleExecRes(SRequestObj* pRequest) { +int32_t handleQueryExecRsp(SRequestObj* pRequest) { if (NULL == pRequest->body.resInfo.execRes.res) { return TSDB_CODE_SUCCESS; } - int32_t code = 0; SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + + int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog); if (code) { return code; } - SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; switch (pRes->msgType) { @@ -539,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) { break; } default: - tscError("invalid exec result for request type %d", pRequest->type); - return TSDB_CODE_APP_ERROR; + tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self, + pRequest->type, pRequest->requestId); + code = TSDB_CODE_APP_ERROR; } return code; @@ -548,6 +576,25 @@ int32_t handleExecRes(SRequestObj* pRequest) { void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*) param; + pRequest->code = code; + + STscObj* pTscObj = pRequest->pTscObj; + if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { + tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); + pRequest->prevCode = code; + doAsyncQuery(pRequest, true); + return; + } + + if (code == TSDB_CODE_SUCCESS) { + code = handleQueryExecRsp(pRequest); + ASSERT(pRequest->code == TSDB_CODE_SUCCESS); + pRequest->code = code; + } + + if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { + removeMeta(pTscObj, pRequest->tableList); + } // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -583,7 +630,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue qDestroyQuery(pQuery); } - handleExecRes(pRequest); + handleQueryExecRsp(pRequest); if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; @@ -617,13 +664,12 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { } void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { - void* pRes = NULL; - int32_t code = 0; + switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: - code = execLocalCmd(pRequest, pQuery); - break; + asyncExecLocalCmd(pRequest, pQuery); + return; case QUERY_EXEC_MODE_RPC: code = asyncExecDdlQuery(pRequest, pQuery); break; @@ -649,11 +695,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { if (TSDB_CODE_SUCCESS == code) { schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); - // if (NULL != pRes) { - // code = validateSversion(pRequest, pRes); - // } } + //todo not to be released here taosArrayDestroy(pNodeList); break; } @@ -671,12 +715,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } - - // if (res) { - // *res = pRes; - // } else { -// freeRequestRes(pRequest, pRes); -// pRes = NULL; } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { @@ -750,7 +788,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { pRequest->code = code; break; } - } while (retryNum++ < REQUEST_MAX_TRY_TIMES); + } while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES); if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { removeMeta(pTscObj, pRequest->tableList); @@ -808,7 +846,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return pTscObj; } - SRequestObj* pRequest = createRequest(pTscObj, param, TDMT_MND_CONNECT); + SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT); if (pRequest == NULL) { destroyTscObj(pTscObj); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -850,7 +888,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; + pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType); pMsgSendInfo->param = pRequest; SConnectReq connectReq = {0}; @@ -1429,7 +1467,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de } code = statusRsp.statusCode; - if (details != NULL && statusRsp.details != NULL) { + if (details != NULL) { tstrncpy(details, statusRsp.details, maxlen); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index bddc32ea20..d192b65bf0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -30,6 +30,7 @@ #define TSC_VAR_RELEASED 0 static int32_t sentinel = TSC_VAR_NOT_RELEASE; +static int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt); int taos_options(TSDB_OPTION option, const void *arg, ...) { static int32_t lock = 0; @@ -192,7 +193,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { STscObj* pTscObj = (STscObj*)taos; #if SYNC_ON_TOP_OF_ASYNC - SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam)); + SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(¶m->sem, 0, 0); taos_query_a(pTscObj, sql, syncQueryFn, param); @@ -608,36 +609,38 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { SQuery* pQuery = pWrapper->pQuery; SRequestObj* pRequest = pWrapper->pRequest; - if (code != TSDB_CODE_SUCCESS) { - goto _error; + if (code == TSDB_CODE_SUCCESS) { + code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); } - code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); - if (code != TSDB_CODE_SUCCESS) { - goto _error; + if (code == TSDB_CODE_SUCCESS) { + if (pQuery->haveResultSet) { + setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); + setResPrecision(&pRequest->body.resInfo, pQuery->precision); + } + + TSWAP(pRequest->dbList, (pQuery)->pDbList); + TSWAP(pRequest->tableList, (pQuery)->pTableList); + + taosMemoryFree(pWrapper); + launchAsyncQuery(pRequest, pQuery); + } else { + if (NEED_CLIENT_HANDLE_ERROR(code)) { + tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); + pRequest->prevCode = code; + doAsyncQuery(pRequest, true); + return; + } + + // return to app directly + taosMemoryFree(pWrapper); + tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + pRequest->code = code; + pRequest->body.queryFp(pRequest->body.param, pRequest, code); } - - if (pQuery->haveResultSet) { - setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, (pQuery)->numOfResCols); - setResPrecision(&pRequest->body.resInfo, (pQuery)->precision); - } - - TSWAP(pRequest->dbList, (pQuery)->pDbList); - TSWAP(pRequest->tableList, (pQuery)->pTableList); - - taosMemoryFree(pWrapper); - launchAsyncQuery(pRequest, pQuery); - return; - - _error: - taosMemoryFree(pWrapper); - tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); - pRequest->code = code; - pRequest->body.queryFp(pRequest->body.param, pRequest, code); } -// todo add retry before return user's callback void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { ASSERT(fp != NULL); @@ -657,24 +660,27 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param } SRequestObj *pRequest = NULL; - int32_t retryNum = 0; - int32_t code = 0; - - // while (retryNum++ < REQUEST_MAX_TRY_TIMES) { - code = buildRequest(taos, sql, sqlLen, &pRequest); + int32_t code = buildRequest(taos, sql, sqlLen, &pRequest); if (code != TSDB_CODE_SUCCESS) { terrno = code; - fp(param, NULL, code); + fp(param, NULL, terrno); return; } pRequest->body.queryFp = fp; pRequest->body.param = param; + doAsyncQuery(pRequest, false); +} - STscObj *pTscObj = pRequest->pTscObj; +int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { + const STscObj *pTscObj = pRequest->pTscObj; - SParseContext* pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); - *pCxt = (SParseContext){.requestId = pRequest->requestId, + *pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); + if (*pCxt == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + **pCxt = (SParseContext){.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = pRequest->pDb, .topicQuery = false, @@ -687,6 +693,22 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param .pUser = pTscObj->user, .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .async = true,}; + return TSDB_CODE_SUCCESS; +} + +void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { + SParseContext* pCxt = NULL; + STscObj *pTscObj = pRequest->pTscObj; + + if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { + pRequest->code = pRequest->prevCode; + goto _error; + } + + int32_t code = createParseContext(pRequest, &pCxt); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pCxt->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCxt->pCatalog); @@ -694,39 +716,36 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param goto _error; } - SQuery * pQuery = NULL; - SCatalogReq catalogReq = {0}; + SQuery *pQuery = NULL; + + SCatalogReq catalogReq = {.forceUpdate = updateMetaForce}; code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); if (code != TSDB_CODE_SUCCESS) { goto _error; } SqlParseWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SqlParseWrapper)); + if (pWrapper == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + pWrapper->pCtx = pCxt; pWrapper->pQuery = pQuery; pWrapper->pRequest = pRequest; pWrapper->catalogReq = catalogReq; code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId, - &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); - - if (code != TSDB_CODE_SUCCESS) { - goto _error; + &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); + if (code == TSDB_CODE_SUCCESS) { + return; } - return; - - // todo handle the retry process - - // if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { - // TSWAP(pRequest->dbList, (pQuery)->pDbList); - // TSWAP(pRequest->tableList, (pQuery)->pTableList); - // } - _error: + tscError("0x%"PRIx64" error happens, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); terrno = code; pRequest->code = code; - fp(param, pRequest, code); + pRequest->body.queryFp(pRequest->body.param, pRequest, code); } static void fetchCallback(void* pResult, void* param, int32_t code) { @@ -751,14 +770,15 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; - pRequest->code = code; - pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + } else { + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, + pRequest->requestId); } - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, - pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); - pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 2d5199f181..14e2798d3b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -21,8 +21,6 @@ #include "tdef.h" #include "tname.h" -int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); - static void setErrno(SRequestObj* pRequest, int32_t code) { pRequest->code = code; terrno = code; @@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { assert(pRequest != NULL); pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL) - ? genericRspCallback - : handleRequestRspFp[TMSG_INDEX(pRequest->type)]; + pMsgSendInfo->fp = getMsgRspHandle(pRequest->type); return pMsgSendInfo; } @@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; @@ -219,6 +214,7 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { } if (pRequest->body.queryFp != NULL) { + removeMeta(pRequest->pTscObj, pRequest->tableList); pRequest->body.queryFp(pRequest->body.param, pRequest, code); } else { tsem_post(&pRequest->body.rspSem); @@ -251,30 +247,54 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); - return code; + } else { + SMAlterStbRsp alterRsp = {0}; + SDecoder coder = {0}; + tDecoderInit(&coder, pMsg->pData, pMsg->len); + tDecodeSMAlterStbRsp(&coder, &alterRsp); + tDecoderClear(&coder); + + pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; + pRequest->body.resInfo.execRes.res = alterRsp.pMeta; } - SMAlterStbRsp alterRsp = {0}; - SDecoder coder = {0}; - tDecoderInit(&coder, pMsg->pData, pMsg->len); - tDecodeSMAlterStbRsp(&coder, &alterRsp); - tDecoderClear(&coder); + if (pRequest->body.queryFp != NULL) { + SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; - pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; - pRequest->body.resInfo.execRes.res = alterRsp.pMeta; + if (code == TSDB_CODE_SUCCESS) { + SCatalog* pCatalog = NULL; + int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (pRes->res != NULL) { + ret = handleAlterTbExecRes(pRes->res, pCatalog); + } - tsem_post(&pRequest->body.rspSem); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + } + + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } return code; } - -// todo refactor: this arraylist is too large -void initMsgHandleFp() { - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp; +__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { + switch (msgType) { + case TDMT_MND_CONNECT: + return processConnectRsp; + case TDMT_MND_CREATE_DB: + return processCreateDbRsp; + case TDMT_MND_USE_DB: + return processUseDbRsp; + case TDMT_MND_CREATE_STB: + return processCreateSTableRsp; + case TDMT_MND_DROP_DB: + return processDropDbRsp; + case TDMT_MND_ALTER_STB: + return processAlterStbRsp; + default: + return genericRspCallback; + } } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5643af746d..4cef67b34d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2364,7 +2364,7 @@ static int32_t isSchemalessDb(SSmlHandle* info){ */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); + SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); if(!request){ uError("SML:taos_schemaless_insert error request is null"); return NULL; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a0f33627c4..f16baba0e7 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -778,7 +778,35 @@ TEST(testCase, async_api_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - taos_query_a(pConn, "drop table test.tm0", queryCallback, pConn); + taos_query(pConn, "use test"); + + TAOS_RES* pRes = taos_query(pConn, "select * from t1"); + + taos_query(pConn, "alter table t1 add column b int"); + pRes = taos_query(pConn, "insert into t1 values(now, 1, 2)"); + if (taos_errno(pRes) != 0) { + printf("failed, reason:%s\n", taos_errstr(pRes)); + } + +// int32_t n = 0; +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t* length = taos_fetch_lengths(pRes); +// for(int32_t i = 0; i < numOfFields; ++i) { +// printf("(%d):%d " , i, length[i]); +// } +// printf("\n"); +// +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// memset(str, 0, sizeof(str)); +// } + + taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn); getchar(); taos_close(pConn); } diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 1358d45f30..3bfd26bb43 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -486,7 +486,7 @@ TEST(testCase, smlProcess_influx_Test) { pRes = taos_query(taos, "use inflx_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -607,7 +607,7 @@ TEST(testCase, smlParseLine_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -656,7 +656,7 @@ TEST(testCase, smlProcess_telnet_Test) { pRes = taos_query(taos, "use telnet_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -710,7 +710,7 @@ TEST(testCase, smlProcess_json1_Test) { pRes = taos_query(taos, "use json_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -779,7 +779,7 @@ TEST(testCase, smlProcess_json2_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -823,7 +823,7 @@ TEST(testCase, smlProcess_json3_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -895,7 +895,7 @@ TEST(testCase, smlProcess_json4_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -957,7 +957,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1006,7 +1006,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1033,7 +1033,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1101,7 +1101,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1146,7 +1146,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1191,7 +1191,7 @@ TEST(testCase, sml_TD15662_Test) { pRes = taos_query(taos, "use db_15662"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); @@ -1218,7 +1218,7 @@ TEST(testCase, sml_TD15735_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1244,7 +1244,7 @@ TEST(testCase, sml_TD15742_Test) { pRes = taos_query(taos, "use TD15742"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index d2b468d588..61d6e7be6c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1392,20 +1392,26 @@ char *buildRetension(SArray *pRetension) { int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); - - p = taosArrayGet(pRetension, 1); - - v1 = getValOfDiffPrecision(p->freqUnit, p->freq); - v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); - - p = taosArrayGet(pRetension, 2); - - v1 = getValOfDiffPrecision(p->freqUnit, p->freq); - v2 = getValOfDiffPrecision(p->keepUnit, p->keep); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + if (size > 1) { + len += sprintf(p1 + len, ","); + p = taosArrayGet(pRetension, 1); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + } + + if (size > 2) { + len += sprintf(p1 + len, ","); + p = taosArrayGet(pRetension, 2); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + } + varDataSetLen(p1, len); return p1; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9219a382e4..1eec92f47e 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -518,6 +518,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2); int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); +char *ctgTaskTypeStr(CTG_TASK_TYPE type); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 6998d8c778..f43e559244 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } @@ -67,7 +67,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -90,7 +90,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -113,7 +113,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } @@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type); + qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } @@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } @@ -227,11 +227,96 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); return TSDB_CODE_SUCCESS; } +int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) { + int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum; + if (dbNum > 0) { + if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) { + SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == pDb) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < pJob->dbVgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + for (int32_t i = 0; i < pJob->dbCfgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbCfg, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + for (int32_t i = 0; i < pJob->dbInfoNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbInfo, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + char* dbFName = taosHashIterate(pDb, NULL); + while (dbFName) { + ctgDropDbVgroupEnqueue(pCtg, dbFName, true); + dbFName = taosHashIterate(pDb, dbFName); + } + + taosHashCleanup(pDb); + } else { + for (int32_t i = 0; i < pJob->dbVgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + + for (int32_t i = 0; i < pJob->dbCfgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbCfg, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + + for (int32_t i = 0; i < pJob->dbInfoNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbInfo, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + } + } + + int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum; + if (tbNum > 0) { + if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) { + SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { + SName* name = taosArrayGet(pReq->pTableMeta, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + for (int32_t i = 0; i < pJob->tbHashNum; ++i) { + SName* name = taosArrayGet(pReq->pTableHash, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + SName* name = taosHashIterate(pTb, NULL); + while (name) { + catalogRemoveTableMeta(pCtg, name); + name = taosHashIterate(pTb, name); + } + + taosHashCleanup(pTb); + } else { + for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { + SName* name = taosArrayGet(pReq->pTableMeta, i); + catalogRemoveTableMeta(pCtg, name); + } + + for (int32_t i = 0; i < pJob->tbHashNum; ++i) { + SName* name = taosArrayGet(pReq->pTableHash, i); + catalogRemoveTableMeta(pCtg, name); + } + } + } + + return TSDB_CODE_SUCCESS; +} int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { int32_t code = 0; @@ -283,12 +368,13 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + if (pReq->forceUpdate) { + CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq)); + } + int32_t taskIdx = 0; for (int32_t i = 0; i < dbVgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbVgroup, i); - if (pReq->forceUpdate) { - ctgDropDbVgroupEnqueue(pCtg, dbFName, true); - } CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); } @@ -304,9 +390,6 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); - if (pReq->forceUpdate) { - catalogRemoveTableMeta(pCtg, name); - } CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); } @@ -342,7 +425,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); - qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate); return TSDB_CODE_SUCCESS; @@ -1108,7 +1191,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { for (int32_t i = 0; i < taskNum; ++i) { SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); - qDebug("QID:%" PRIx64 " start to launch task %d", pJob->queryId, pTask->taskId); + qDebug("QID:0x%" PRIx64 " start to launch task %d", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index b16a082f75..188f5b44c0 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -177,7 +177,7 @@ int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode) SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId); - qDebug("QID:%" PRIx64 " task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + qDebug("QID:0x%" PRIx64 " task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); @@ -244,7 +244,7 @@ int32_t ctgAsyncSendMsg(CTG_PARAMS, SCtgTask* pTask, int32_t msgType, void *msg, CTG_ERR_JRET(code); } - ctgDebug("req msg sent, reqId:%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType)); + ctgDebug("req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType)); return TSDB_CODE_SUCCESS; _return: @@ -565,7 +565,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo * } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pTrans, &vgroupInfo->epSet, pTask, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 4625203dd8..962edf9af1 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -19,6 +19,31 @@ #include "catalogInt.h" #include "systable.h" +char *ctgTaskTypeStr(CTG_TASK_TYPE type) { + switch (type) { + case CTG_TASK_GET_QNODE: + return "[get qnode list]"; + case CTG_TASK_GET_DB_VGROUP: + return "[get db vgroup]"; + case CTG_TASK_GET_DB_CFG: + return "[get db cfg]"; + case CTG_TASK_GET_DB_INFO: + return "[get db info]"; + case CTG_TASK_GET_TB_META: + return "[get table meta]"; + case CTG_TASK_GET_TB_HASH: + return "[get table hash]"; + case CTG_TASK_GET_INDEX: + return "[get index]"; + case CTG_TASK_GET_UDF: + return "[get udf]"; + case CTG_TASK_GET_USER: + return "[get user]"; + default: + return "unknown"; + } +} + void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pTableMeta); pData->pTableMeta = NULL; @@ -477,6 +502,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pVgroup = *vgInfo; + ctgDebug("Got tb %s hash vgroup, vgId %d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps, + vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); + CTG_RET(code); } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 6c44ce4c3c..c79820a950 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -258,6 +258,17 @@ static int32_t collectMetaKeyFromExplain(SCollectMetaKeyCxt* pCxt, SExplainStmt* return collectMetaKeyFromQuery(pCxt, pStmt->pQuery); } +static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStmt* pStmt) { + SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; + strcpy(name.dbname, pStmt->dbName); + strcpy(name.tname, pStmt->tableName); + int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + return code; +} + static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) { return collectMetaKeyFromQuery(pCxt, pStmt->pQuery); } @@ -381,6 +392,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt); case QUERY_NODE_EXPLAIN_STMT: return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt); + case QUERY_NODE_DESCRIBE_STMT: + return collectMetaKeyFromDescribe(pCxt, (SDescribeStmt*)pStmt); case QUERY_NODE_CREATE_STREAM_STMT: return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt); case QUERY_NODE_SHOW_DNODES_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f1892d0040..6334d079c8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, } } - taosArrayDestroy(dbCfg.pRetensions); return code; } diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index 8fb28ce395..c7f0990132 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -207,6 +207,13 @@ int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo); } +int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, + const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { + return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); +} + +int32_t __catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { return 0; } + void initMetaDataEnv() { g_mockCatalogService.reset(new MockCatalogService()); @@ -221,6 +228,8 @@ void initMetaDataEnv() { stub.set(catalogGetDBCfg, __catalogGetDBCfg); stub.set(catalogChkAuth, __catalogChkAuth); stub.set(catalogGetUdfInfo, __catalogGetUdfInfo); + stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta); + stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta); // { // AddrAny any("libcatalog.so"); // std::map result; diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 528d465dbd..8562926789 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -21,7 +21,7 @@ namespace ParserTest { class ParserInitialDTest : public ParserDdlTest {}; -// DELETE FROM tb_name [WHERE condition] +// DELETE FROM table_name [WHERE condition] TEST_F(ParserInitialDTest, delete) { useDb("root", "test"); @@ -40,7 +40,15 @@ TEST_F(ParserInitialDTest, deleteSemanticCheck) { run("DELETE FROM t1 WHERE c1 > 10", TSDB_CODE_PAR_INVALID_DELETE_WHERE, PARSER_STAGE_TRANSLATE); } -// todo desc +// DESC table_name +TEST_F(ParserInitialDTest, describe) { + useDb("root", "test"); + + run("DESC t1"); + + run("DESCRIBE st1"); +} + // todo describe // todo DROP account diff --git a/source/libs/parser/test/parTestUtil.h b/source/libs/parser/test/parTestUtil.h index 44be7a2474..07f3d3cece 100644 --- a/source/libs/parser/test/parTestUtil.h +++ b/source/libs/parser/test/parTestUtil.h @@ -50,11 +50,13 @@ class ParserDdlTest : public ParserTestBase { virtual void checkDdl(const SQuery* pQuery, ParserStage stage) { ASSERT_NE(pQuery, nullptr); - ASSERT_EQ(pQuery->haveResultSet, false); ASSERT_NE(pQuery->pRoot, nullptr); - ASSERT_EQ(pQuery->numOfResCols, 0); - ASSERT_EQ(pQuery->pResSchema, nullptr); - ASSERT_EQ(pQuery->precision, 0); + if (QUERY_EXEC_MODE_RPC == pQuery->execMode) { + ASSERT_EQ(pQuery->haveResultSet, false); + ASSERT_EQ(pQuery->numOfResCols, 0); + ASSERT_EQ(pQuery->pResSchema, nullptr); + ASSERT_EQ(pQuery->precision, 0); + } if (nullptr != checkDdl_) { checkDdl_(pQuery, stage); } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e77f2b0ca4..949a58c2fa 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -37,6 +37,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; pOut->dbVgroup->hashMethod = usedbRsp->hashMethod; + qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db); + if (usedbRsp->vgNum <= 0) { return TSDB_CODE_SUCCESS; } @@ -50,6 +52,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; + qDebug("the %dth vgroup, id %d, epNum %d, current %s port %d", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps, + pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port); if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index ca90d2fe34..7a0272a4d0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -108,7 +108,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN pJob->refId = refId; - SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); + SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId); pJob->status = JOB_TASK_STATUS_NOT_START; diff --git a/tests/script/tsim/stable/column_drop.sim b/tests/script/tsim/stable/column_drop.sim index 3401465103..63ac44eccd 100644 --- a/tests/script/tsim/stable/column_drop.sim +++ b/tests/script/tsim/stable/column_drop.sim @@ -118,6 +118,7 @@ if $data[0][5] != 5 then return -1 endi if $data[0][6] != 101 then + print expect 101, actual: $data06 return -1 endi diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index c73efde9d5..1f258028cc 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -7,57 +7,57 @@ #run tsim/table/basic1.sim #run tsim/trans/lossdata1.sim #run tsim/trans/create_db.sim -##run tsim/stable/alter_metrics.sim -##run tsim/stable/tag_modify.sim -##run tsim/stable/alter_comment.sim -##run tsim/stable/column_drop.sim -##run tsim/stable/column_modify.sim -##run tsim/stable/tag_rename.sim +#run tsim/stable/alter_metrics.sim +#run tsim/stable/tag_modify.sim +#run tsim/stable/alter_comment.sim +#run tsim/stable/column_drop.sim +#run tsim/stable/column_modify.sim +#run tsim/stable/tag_rename.sim #run tsim/stable/vnode3.sim #run tsim/stable/metrics.sim -##run tsim/stable/alter_insert2.sim +#run tsim/stable/alter_insert2.sim #run tsim/stable/show.sim #run tsim/stable/alter_import.sim -##run tsim/stable/tag_add.sim +#run tsim/stable/tag_add.sim #run tsim/stable/tag_drop.sim #run tsim/stable/column_add.sim #run tsim/stable/alter_count.sim #run tsim/stable/values.sim -#run tsim/stable/dnode3.sim +run tsim/stable/dnode3.sim #run tsim/stable/alter_insert1.sim #run tsim/stable/refcount.sim #run tsim/stable/disk.sim -#run tsim/db/basic1.sim -#run tsim/db/basic3.sim -##run tsim/db/basic7.sim -#run tsim/db/basic6.sim -#run tsim/db/create_all_options.sim -#run tsim/db/basic2.sim -#run tsim/db/error1.sim -#run tsim/db/taosdlog.sim -#run tsim/db/alter_option.sim -#run tsim/mnode/basic1.sim -#run tsim/mnode/basic3.sim -#run tsim/mnode/basic2.sim -#run tsim/parser/fourArithmetic-basic.sim -#run tsim/parser/groupby-basic.sim -#run tsim/snode/basic1.sim -#run tsim/query/time_process.sim -#run tsim/query/stddev.sim -#run tsim/query/interval-offset.sim -#run tsim/query/charScalarFunction.sim -#run tsim/query/complex_select.sim -#run tsim/query/explain.sim -#run tsim/query/crash_sql.sim -#run tsim/query/diff.sim -#run tsim/query/complex_limit.sim -#run tsim/query/complex_having.sim -#run tsim/query/udf.sim -#run tsim/query/complex_group.sim -#run tsim/query/interval.sim -#run tsim/query/session.sim -#run tsim/query/scalarFunction.sim -##run tsim/query/scalarNull.sim +run tsim/db/basic1.sim +run tsim/db/basic3.sim +#run tsim/db/basic7.sim +run tsim/db/basic6.sim +run tsim/db/create_all_options.sim +run tsim/db/basic2.sim +run tsim/db/error1.sim +run tsim/db/taosdlog.sim +run tsim/db/alter_option.sim +run tsim/mnode/basic1.sim +run tsim/mnode/basic3.sim +run tsim/mnode/basic2.sim +run tsim/parser/fourArithmetic-basic.sim +run tsim/parser/groupby-basic.sim +run tsim/snode/basic1.sim +run tsim/query/time_process.sim +run tsim/query/stddev.sim +run tsim/query/interval-offset.sim +run tsim/query/charScalarFunction.sim +run tsim/query/complex_select.sim +run tsim/query/explain.sim +run tsim/query/crash_sql.sim +run tsim/query/diff.sim +run tsim/query/complex_limit.sim +run tsim/query/complex_having.sim +run tsim/query/udf.sim +run tsim/query/complex_group.sim +run tsim/query/interval.sim +run tsim/query/session.sim +run tsim/query/scalarFunction.sim +#run tsim/query/scalarNull.sim run tsim/query/complex_where.sim run tsim/tmq/basic1.sim run tsim/tmq/basic4.sim diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 851d9a2070..329842aeb3 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1003,7 +1003,7 @@ int32_t shellExecute() { taosSetSignal(SIGINT, shellSigintHandler); - shellGetGrantInfo(shell.conn); + shellGetGrantInfo(); while (1) { taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);