From 763eccf8aa4e0e6c4526142a68c2fb1cf76aa9c5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 10 Mar 2022 19:05:58 +0800 Subject: [PATCH 1/5] feature/scheduler --- include/common/tmsg.h | 7 + include/common/tname.h | 4 +- include/libs/catalog/catalog.h | 26 +- include/libs/executor/executor.h | 4 +- include/libs/nodes/plannodes.h | 4 +- include/libs/qcom/query.h | 12 + include/libs/scheduler/scheduler.h | 9 +- include/util/taoserror.h | 1 + source/client/inc/clientInt.h | 1 + source/client/src/clientImpl.c | 102 ++++- source/common/src/tmsg.c | 1 + source/common/src/tname.c | 30 +- source/dnode/vnode/inc/vnode.h | 3 + source/dnode/vnode/src/vnd/vnodeCfg.c | 21 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- source/libs/catalog/inc/catalogInt.h | 27 +- source/libs/catalog/src/catalog.c | 401 ++++++++++++-------- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 4 +- source/libs/executor/src/executorimpl.c | 43 ++- source/libs/executor/test/executorTests.cpp | 2 +- source/libs/qcom/src/queryUtil.c | 14 + source/libs/qworker/inc/qworkerMsg.h | 2 +- source/libs/qworker/src/qworker.c | 7 +- source/libs/qworker/src/qworkerMsg.c | 16 +- source/libs/qworker/test/qworkerTests.cpp | 2 +- source/libs/scheduler/inc/schedulerInt.h | 18 +- source/libs/scheduler/src/schFlowCtrl.c | 2 +- source/libs/scheduler/src/scheduler.c | 95 +++-- source/util/src/terror.c | 1 + 31 files changed, 600 insertions(+), 265 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c6ae35c463..c114457b6b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -24,6 +24,7 @@ #include "thash.h" #include "tlist.h" #include "trow.h" +#include "tname.h" #ifdef __cplusplus extern "C" { @@ -459,8 +460,14 @@ typedef struct { typedef struct { int32_t code; + SName tableName; } SQueryTableRsp; +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + + typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroups; diff --git a/include/common/tname.h b/include/common/tname.h index 6de38a68ee..ffa4f8f253 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -17,7 +17,6 @@ #define _TD_COMMON_NAME_H_ #include "tdef.h" -#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -61,7 +60,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type); int32_t tNameSetAcctId(SName* dst, int32_t acctId); -SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); +bool tNameDBNameEqual(SName* left, SName* right); + #ifdef __cplusplus } diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f217277b80..197040567c 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -120,7 +120,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ @@ -131,7 +131,7 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ @@ -140,28 +140,38 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpS int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); +/** + * Force refresh DB's local cached vgroup info. + * @param pCtg (input, got with catalogGetHandle) + * @param pTrans (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param dbFName (input, db full name) + * @return error code + */ +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName); + /** * Force refresh a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); +int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); /** * Force refresh a table's local cached meta data and get the new one. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); +int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); @@ -170,7 +180,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ @@ -181,7 +191,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param vgInfo (output, vgroup info) * @return error code */ diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d4af51fc21..e1729835de 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tcommon.h" +#include "query.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -30,6 +31,7 @@ struct SSubplan; typedef struct SReadHandle { void* reader; void* meta; + void* config; } SReadHandle; /** @@ -67,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA * @param qId * @return */ -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c805eba320..d1bbbe7dbc 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -90,7 +90,7 @@ typedef struct SSubLogicPlan { } SSubLogicPlan; typedef struct SQueryLogicPlan { - ENodeType type;; + ENodeType type; SNodeList* pSubplans; } SQueryLogicPlan; @@ -127,7 +127,7 @@ typedef struct SScanPhysiNode { int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t count; // repeat count int32_t reverse; // reverse scan count - char tableName[TSDB_TABLE_NAME_LEN]; + SName tableName; } SScanPhysiNode; typedef SScanPhysiNode SSystemTableScanPhysiNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 07c5c1e499..af6077208a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -138,6 +138,11 @@ typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; +typedef struct SQueryErrorInfo { + int32_t code; + SName tableName; +} SQueryErrorInfo; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); @@ -170,6 +175,9 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta); +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); + + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); @@ -178,6 +186,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE +#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH) +#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) + + #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); }} while(0) #define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); }} while(0) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 56da9ece6f..2d4cbd4ac0 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -52,10 +52,11 @@ typedef struct SQueryProfileSummary { } SQueryProfileSummary; typedef struct SQueryResult { - int32_t code; - uint64_t numOfRows; - int32_t msgSize; - char *msg; + int32_t code; + SArray *errList; // SArray + uint64_t numOfRows; + int32_t msgSize; + char *msg; } SQueryResult; typedef struct STaskInfo { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5d42646238..0e80ebdb75 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -326,6 +326,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) #define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) +#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 321e8ab77b..4a49c0f528 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -184,6 +184,7 @@ typedef struct SRequestObj { char* msgBuf; void* pInfo; // sql parse info, generated by parser module int32_t code; + SArray* errList; // SArray SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 85d98e802f..30ffaaee41 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -223,6 +223,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } + pRequest->errList = res.errList; pRequest->code = code; return pRequest->code; } @@ -234,19 +235,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } } - + + pRequest->errList = res.errList; pRequest->code = res.code; return pRequest->code; } -TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { - STscObj* pTscObj = (STscObj*)taos; - if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); - terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - return NULL; - } - +SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; SQuery* pQuery; SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); @@ -273,6 +268,93 @@ _return: return pRequest; } +int32_t clientProcessErrorList(SArray **pList) { + SArray *errList = *pList; + int32_t errNum = (int32_t)taosArrayGetSize(errList); + + for (int32_t i = 0; i < errNum; ++i) { + SQueryErrorInfo *errInfo = taosArrayGet(errList, i); + if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + if (i == (errNum - 1)) { + break; + } + + // TODO REMOVE SAME DB ERROR + } else { + taosArrayRemove(errList, i); + --i; + --errNum; + } + } + + if (0 == errNum) { + taosArrayDestroy(*pList); + *pList = NULL; + } + + return TSDB_CODE_SUCCESS; +} + + +SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { + SRequestObj* pRequest = NULL; + int32_t code = 0; + bool quit = false; + + while (!quit) { + pRequest = execQueryImpl(pTscObj, sql, sqlLen); + if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) { + break; + } + + code = clientProcessErrorList(&pRequest->errList); + if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) { + break; + } + + int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList); + for (int32_t i = 0; i < errNum; ++i) { + SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i); + + if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + quit = true; + break; + } + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&errInfo->tableName, dbFName); + + code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); + if (code != TSDB_CODE_SUCCESS) { + quit = true; + break; + } + } + } + } + + if (code) { + pRequest->code = code; + } + + return pRequest; +} + +TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { + tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + return NULL; + } + + return execQuery(pTscObj, sql, sqlLen); +} + int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; @@ -389,7 +471,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { - return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; + return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f80bb3d8bd..7a46261684 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2655,3 +2655,4 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { return buf; } + \ No newline at end of file diff --git a/source/common/src/tname.c b/source/common/src/tname.c index fb77417cac..d08865714c 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -222,6 +222,27 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId) { return 0; } +bool tNameDBNameEqual(SName* left, SName* right) { + if (NULL == left) { + if (NULL == right) { + return true; + } + + return false; + } + + if (NULL == right) { + return false; + } + + if (left->acctId != right->acctId) { + return false; + } + + return (0 == strcmp(left->dbname, right->dbname)); +} + + int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { assert(dst != NULL && str != NULL && strlen(str) > 0); @@ -273,13 +294,4 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return 0; } -SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { - SSchema s = {0}; - s.type = type; - s.bytes = bytes; - s.colId = colId; - - tstrncpy(s.name, name, tListLen(s.name)); - return s; -} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 319bd9fde6..2d07a4e6ab 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -193,6 +193,9 @@ void vnodeOptionsInit(SVnodeCfg *pOptions); */ void vnodeOptionsClear(SVnodeCfg *pOptions); +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName); + + /* ------------------------ FOR COMPILE ------------------------ */ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 727a4b41f7..10d0d33722 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -32,4 +32,23 @@ int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) { void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) { memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg)); -} \ No newline at end of file +} + +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName) { + uint32_t hashValue = 0; + + switch (pVnodeOptions->hashMethod) { + default: + hashValue = MurmurHash3_32(tableName, strlen(tableName)); + break; + } + + if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) { + terrno = TSDB_CODE_VND_HASH_MISMATCH; + return TSDB_CODE_VND_HASH_MISMATCH; + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0359483ba2..ac10f78a8a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -30,7 +30,7 @@ void vnodeQueryClose(SVnode *pVnode) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta}; + SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; switch (pMsg->msgType) { case TDMT_VND_QUERY:{ diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c4f1a117fe..03d78dbf48 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -159,8 +159,10 @@ typedef struct SCtgRemoveTblMsg { typedef struct SCtgMetaAction { - int32_t act; - void *data; + int32_t act; + void *data; + bool syncReq; + uint64_t seqId; } SCtgMetaAction; typedef struct SCtgQNode { @@ -168,14 +170,21 @@ typedef struct SCtgQNode { struct SCtgQNode *next; } SCtgQNode; +typedef struct SCtgQueue { + SRWLatch qlock; + uint64_t seqId; + uint64_t seqDone; + SCtgQNode *head; + SCtgQNode *tail; + tsem_t reqSem; + tsem_t rspSem; + uint64_t qRemainNum; +} SCtgQueue; + typedef struct SCatalogMgmt { bool exit; SRWLatch lock; - SRWLatch qlock; - SCtgQNode *head; - SCtgQNode *tail; - tsem_t sem; - uint64_t qRemainNum; + SCtgQueue queue; pthread_t updateThread; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; @@ -191,8 +200,8 @@ typedef struct SCtgAction { ctgActFunc func; } SCtgAction; -#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1) -#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1) +#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) +#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) #define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1) #define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 71a4bb7cef..fb378928b6 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -229,44 +229,6 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) { } - -void ctgPopAction(SCtgMetaAction **action) { - SCtgQNode *orig = gCtgMgmt.head; - - SCtgQNode *node = gCtgMgmt.head->next; - gCtgMgmt.head = gCtgMgmt.head->next; - - CTG_QUEUE_SUB(); - - tfree(orig); - - *action = &node->action; -} - - -int32_t ctgPushAction(SCtgMetaAction *action) { - SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); - if (NULL == node) { - qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); - CTG_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - node->action = *action; - - CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock); - gCtgMgmt.tail->next = node; - gCtgMgmt.tail = node; - CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock); - - CTG_QUEUE_ADD(); - CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum); - - tsem_post(&gCtgMgmt.sem); - - return TSDB_CODE_SUCCESS; -} - - void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { return; @@ -284,94 +246,6 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { } -int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; - SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->dbId = dbId; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - -int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; - SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->stbName, stbName, sizeof(msg->stbName)); - msg->dbId = dbId; - msg->suid = suid; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - - -int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; - SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->tbName, tbName, sizeof(msg->tbName)); - msg->dbId = dbId; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { CTG_LOCK(CTG_WRITE, &cache->stbLock); if (cache->stbCache) { @@ -437,6 +311,180 @@ void ctgFreeHandle(SCatalog* pCtg) { } + +void ctgWaitAction(SCtgMetaAction *action) { + while (true) { + tsem_wait(&gCtgMgmt.queue.rspSem); + + if (atomic_load_8(&gCtgMgmt.exit)) { + tsem_post(&gCtgMgmt.queue.rspSem); + break; + } + + if (gCtgMgmt.queue.seqDone >= action->seqId) { + break; + } + + tsem_post(&gCtgMgmt.queue.rspSem); + sched_yield(); + } +} + +void ctgPopAction(SCtgMetaAction **action) { + SCtgQNode *orig = gCtgMgmt.queue.head; + + SCtgQNode *node = gCtgMgmt.queue.head->next; + gCtgMgmt.queue.head = gCtgMgmt.queue.head->next; + + CTG_QUEUE_SUB(); + + tfree(orig); + + *action = &node->action; +} + + +int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) { + SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); + if (NULL == node) { + qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + CTG_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + action->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); + + node->action = *action; + + CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + gCtgMgmt.queue.tail->next = node; + gCtgMgmt.queue.tail = node; + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + + CTG_QUEUE_ADD(); + CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum); + + tsem_post(&gCtgMgmt.queue.reqSem); + + ctgDebug("action [%s] added into queue", gCtgAction[action->act].name); + + if (action->syncReq) { + ctgWaitAction(action); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; + SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + +int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; + SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->stbName, stbName, sizeof(msg->stbName)); + msg->dbId = dbId; + msg->suid = suid; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + + +int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; + SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->tbName, tbName, sizeof(msg->tbName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + +int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG, .syncReq = syncReq}; + SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); + ctgFreeVgInfo(dbInfo); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->pCtg = pCtg; + msg->dbId = dbId; + msg->dbInfo = dbInfo; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeVgInfo(dbInfo); + tfree(action.data); + CTG_RET(code); +} + + int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); @@ -1591,38 +1639,57 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const } CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); - - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; - SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); - ctgFreeVgInfo(DbOut.dbVgroup); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->pCtg = pCtg; - msg->dbId = DbOut.dbId; - msg->dbInfo = DbOut.dbVgroup; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false)); return TSDB_CODE_SUCCESS; _return: tfree(*pInfo); - tfree(msg); - *pInfo = DbOut.dbVgroup; CTG_RET(code); } +int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) { + bool inCache = false; + int32_t code = 0; + SCtgDBCache** dbCache = NULL; + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + + SUseDbOutput DbOut = {0}; + SBuildUseDBInput input = {0}; + tstrncpy(input.db, dbFName, tListLen(input.db)); + + if (inCache) { + input.dbId = (*dbCache)->dbId; + input.vgVersion = (*dbCache)->vgInfo->vgVersion; + input.numOfTable = (*dbCache)->vgInfo->numOfTable; + + ctgReleaseVgInfo(*dbCache); + ctgReleaseDBCache(pCtg, *dbCache); + } else { + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + } + + code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); + if (code) { + if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) { + ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId); + ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId); + } + + CTG_ERR_RET(code); + } + + CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) { *pOutput = malloc(sizeof(STableMetaOutput)); @@ -1746,9 +1813,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -2042,9 +2107,10 @@ void* ctgUpdateThreadFunc(void* param) { CTG_LOCK(CTG_READ, &gCtgMgmt.lock); while (true) { - tsem_wait(&gCtgMgmt.sem); + tsem_wait(&gCtgMgmt.queue.reqSem); if (atomic_load_8(&gCtgMgmt.exit)) { + tsem_post(&gCtgMgmt.queue.rspSem); break; } @@ -2056,6 +2122,12 @@ void* ctgUpdateThreadFunc(void* param) { (*gCtgAction[action->act].func)(action); + gCtgMgmt.queue.seqDone = action->seqId; + + if (action->syncReq) { + tsem_post(&gCtgMgmt.queue.rspSem); + } + CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); ctgDbgShowClusterCache(pCtg); @@ -2125,14 +2197,15 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(ctgStartUpdateThread()); - tsem_init(&gCtgMgmt.sem, 0, 0); + tsem_init(&gCtgMgmt.queue.reqSem, 0, 0); + tsem_init(&gCtgMgmt.queue.rspSem, 0, 0); - gCtgMgmt.head = calloc(1, sizeof(SCtgQNode)); - if (NULL == gCtgMgmt.head) { + gCtgMgmt.queue.head = calloc(1, sizeof(SCtgQNode)); + if (NULL == gCtgMgmt.queue.head) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - gCtgMgmt.tail = gCtgMgmt.head; + gCtgMgmt.queue.tail = gCtgMgmt.queue.head; qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); @@ -2332,12 +2405,10 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); dbInfo = NULL; - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - CTG_API_LEAVE(code); _return: @@ -2443,9 +2514,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); CTG_API_LEAVE(code); @@ -2458,6 +2527,15 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName)); +} int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { CTG_API_ENTER(); @@ -2702,7 +2780,8 @@ void catalogDestroy(void) { atomic_store_8(&gCtgMgmt.exit, true); - tsem_post(&gCtgMgmt.sem); + tsem_post(&gCtgMgmt.queue.reqSem); + tsem_post(&gCtgMgmt.queue.rspSem); while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { usleep(1); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d46978b26d..43e6adf591 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -721,7 +721,7 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a8602b7c77..ce50298add 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index fabaa2d31d..1684a6e936 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) { qDestroyTask(*handle); } -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, errInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7e46a7a63d..480114480c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8072,7 +8072,8 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { +int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { + int32_t code = 0; if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); } @@ -8081,11 +8082,21 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; + char tableFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&pScanPhyNode->tableName, tableFName); + + code = vnodeValidateTableHash(pHandle->config, tableFName); + if (code) { + errInfo->code = code; + errInfo->tableName = pScanPhyNode->tableName; + return code; + } + size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + pTaskInfo->pRoot = createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { // SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode; // return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); @@ -8093,7 +8104,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + if (code) { + return code; + } + SArray* idList = NULL; if (groupInfo.numOfTables > 0) { @@ -8126,12 +8141,15 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + code = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); + if (code) { + return code; + } int32_t resultRowSize = 0; SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode, &resultRowSize); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); + pTaskInfo->pRoot = createAggregateOperatorInfo(pTaskInfo->pRoot, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); @@ -8143,6 +8161,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ + + if (pTaskInfo->pRoot == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + return code; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) { @@ -8205,7 +8229,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead return NULL; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) { +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; @@ -8216,9 +8240,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); - if ((*pTaskInfo)->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; + code = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); + if (code) { goto _complete; } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index dcb1774ad0..d6414a908c 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -946,7 +946,7 @@ TEST(testCase, build_executor_tree_Test) { int32_t code = qStringToSubplan(msg, &plan); ASSERT_EQ(code, 0); - code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle); + code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle, NULL); ASSERT_EQ(code, 0); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 63fbf59c06..543a908226 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -161,3 +161,17 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; } + + +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { + SSchema s = {0}; + s.type = type; + s.bytes = bytes; + s.colId = colId; + + tstrncpy(s.name, name, tListLen(s.name)); + return s; +} + + + diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index ecb5dbd654..f8d8ce4563 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b9ef6f8504..ea8e818863 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -998,6 +998,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; + SQueryErrorInfo errInfo = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); @@ -1019,7 +1020,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); + code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle, &errInfo); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); @@ -1032,7 +1033,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { //TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code, NULL)); QW_TASK_DLOG("query msg rsped, code:%d", code); queryRsped = true; @@ -1051,7 +1052,7 @@ _return: } if (!queryRsped) { - qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); + qwBuildAndSendQueryRsp(qwMsg->connection, rspCode, &errInfo); QW_TASK_DLOG("query msg rsped, code:%x", rspCode); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 7d633d1c73..ce39c710c8 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -44,17 +44,23 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo) { SRpcMsg *pMsg = (SRpcMsg *)connection; - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; + SQueryTableRsp rsp = {.code = code}; + if (errInfo && errInfo->code) { + rsp.tableName = errInfo->tableName; + } + + int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); + void *msg = rpcMallocCont(contLen); + tSerializeSQueryTableRsp(msg, contLen, &rsp); SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pRsp, - .contLen = sizeof(*pRsp), + .pCont = msg, + .contLen = contLen, .code = code, }; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 231f0c7fff..a1e3f82d4c 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -262,7 +262,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { return; } -int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) { int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum); qwtTestSinkBlockNum = 0; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d9c2ae6d65..146c7c1966 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -136,6 +136,7 @@ typedef struct SSchJob { uint64_t queryId; SSchJobAttr attr; int32_t levelNum; + int32_t taskNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr SArray *levels; // Element is SQueryLevel, starting from 0. SArray @@ -154,7 +155,8 @@ typedef struct SSchJob { int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - void *res; //TODO free it or not + SArray *errList; // SArray + void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; SQueryProfileSummary summary; @@ -168,9 +170,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) #define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) -#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) -#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) -#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) +#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) +#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) +#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -180,12 +182,14 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) -#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) -#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) +#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse]) #define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps) @@ -219,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 9fba6523b6..4a2173561f 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -259,7 +259,7 @@ _return: SCH_UNLOCK(SCH_WRITE, &ctrl->lock); if (code) { - code = schProcessOnTaskFailure(pJob, pTask, code); + code = schProcessOnTaskFailure(pJob, pTask, code, NULL); } SCH_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 63c634a384..02eb8869e4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -410,6 +410,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + ++pJob->taskNum; } SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); @@ -588,7 +590,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; //TODO CHECK epList/condidateList - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); @@ -611,7 +613,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { ++pTask->candidateIdx; @@ -727,8 +729,32 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } +int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *errInfo) { + if (NULL == errInfo || !SCH_IS_DATA_SRC_TASK(pTask) || !IS_CLIENT_RETRY_ERROR(errInfo->code)) { + return TSDB_CODE_SUCCESS; + } + + if (NULL == pJob->errList) { + SSchLevel *level = taosArrayGetLast(pJob->levels); + + pJob->errList = taosArrayInit(level->taskNum, sizeof(SQueryErrorInfo)); + if (NULL == pJob->errList) { + SCH_TASK_ELOG("taosArrayInit %d errInfofailed", pJob->taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(pJob->errList, errInfo)) { + SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + + // Note: no more task error processing, handled in function internal -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo) { int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { @@ -757,8 +783,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); + + SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errInfo)); - if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskFailed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -801,7 +829,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (parentNum == 0) { int32_t taskDone = 0; - if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskSucceed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -870,11 +898,11 @@ int32_t schFetchFromRemote(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - void *res = atomic_load_ptr(&pJob->res); - if (res) { + void *resData = atomic_load_ptr(&pJob->resData); + if (resData) { atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_JOB_DLOG("res already fetched, res:%p", res); + SCH_JOB_DLOG("res already fetched, res:%p", resData); return TSDB_CODE_SUCCESS; } @@ -886,7 +914,7 @@ _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL)); } @@ -894,6 +922,8 @@ _return: int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; int8_t status = 0; + bool errInfoGot = false; + SQueryErrorInfo errInfo = {0}; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); @@ -933,13 +963,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } case TDMT_VND_QUERY_RSP: { - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + SQueryTableRsp rsp = {0}; + if (msg) { + tDeserializeSQueryTableRsp(msg, msgSize, &rsp); + if (rsp.code) { + errInfo.code = rsp.code; + errInfo.tableName = rsp.tableName; + errInfoGot = true; + } + + SCH_ERR_JRET(rsp.code); + } SCH_ERR_JRET(rspCode); + if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); @@ -966,13 +1006,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - if (pJob->res) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + if (pJob->resData) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData); tfree(rsp); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_store_ptr(&pJob->res, rsp); + atomic_store_ptr(&pJob->resData, rsp); atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); if (rsp->completed) { @@ -999,7 +1039,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? &errInfo : NULL)); } @@ -1423,7 +1463,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, NULL)); } int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { @@ -1474,13 +1514,15 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { } void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { + if (!SCH_IS_NEED_DROP_JOB(pJob)) { + return; + } + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; - if (!SCH_TASK_NO_NEED_DROP(pTask)) { - schDropTaskOnExecutedNode(pJob, pTask); - } + schDropTaskOnExecutedNode(pJob, pTask); pIter = taosHashIterate(list, pIter); } @@ -1537,8 +1579,9 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); - - tfree(pJob->res); + taosArrayDestroy(pJob->errList); + + tfree(pJob->resData); tfree(pJob); @@ -1673,8 +1716,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); SSchJob *job = schAcquireJob(*pJob); + pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; + pRes->errList = job->errList; + job->errList = NULL; + schReleaseJob(*pJob); return TSDB_CODE_SUCCESS; @@ -1862,14 +1909,14 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } - if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { + if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } while (true) { - *pData = atomic_load_ptr(&pJob->res); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { + *pData = atomic_load_ptr(&pJob->resData); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { continue; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c1cb4f8a41..fc9c36097c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -324,6 +324,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") From 9359d738eb5172d3ce7185ebfd222d2e7ace5e94 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 10 Mar 2022 19:36:30 +0800 Subject: [PATCH 2/5] feature/scheduler --- source/common/src/tmsg.c | 35 +++++++++++++++++- source/libs/executor/src/executorimpl.c | 47 +++++++++++-------------- 2 files changed, 54 insertions(+), 28 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b0b0adbe78..48a2893dfb 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2623,6 +2623,40 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); } +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->tableName.type) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->tableName.acctId) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tableName.dbname) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tableName.tname) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->tableName.type) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->tableName.acctId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tableName.dbname) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tableName.tname) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + + int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; @@ -2655,4 +2689,3 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { return buf; } - \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 326f7ca3e9..800e530e2f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8098,8 +8098,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); -int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { - int32_t code = 0; +SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); } @@ -8111,18 +8110,18 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, char tableFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pScanPhyNode->tableName, tableFName); - code = vnodeValidateTableHash(pHandle->config, tableFName); + int32_t code = vnodeValidateTableHash(pHandle->config, tableFName); if (code) { errInfo->code = code; errInfo->tableName = pScanPhyNode->tableName; - return code; + return NULL; } size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - pTaskInfo->pRoot = createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; @@ -8132,11 +8131,7 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; - code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); - if (code) { - return code; - } - + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); SArray* idList = NULL; if (groupInfo.numOfTables > 0) { @@ -8155,11 +8150,9 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, idList = taosArrayInit(4, sizeof(uint64_t)); } - // SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo); - // taosArrayDestroy(idList); - - // //TODO destroy groupInfo - // return pOperator; +// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo); + taosArrayDestroy(idList); +// return pOperator; } } @@ -8169,14 +8162,14 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - code = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); - if (code) { - return code; + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); + if (errInfo->code) { + return NULL; } SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - pTaskInfo->pRoot = createAggregateOperatorInfo(pTaskInfo->pRoot, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); + return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); @@ -8188,12 +8181,6 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ - - if (pTaskInfo->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - return code; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) { @@ -8267,8 +8254,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - code = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); - if (code) { + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); + if (errInfo->code) { + code = errInfo->code; + goto _complete; + } + + if ((*pTaskInfo)->pRoot == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; } From 888aa70b88e5eb870c1a8e22c21779b259503ba7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 11 Mar 2022 10:53:23 +0800 Subject: [PATCH 3/5] feature/scheduler --- include/util/taoserror.h | 1 + source/libs/catalog/src/catalog.c | 238 ++++++++++++++++-------------- source/util/src/terror.c | 3 +- 3 files changed, 131 insertions(+), 111 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0e80ebdb75..d7f8a6d7ab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -453,6 +453,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) #define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) #define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) +#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407) //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 81e68e5433..4d8aac027d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -484,6 +484,31 @@ _return: CTG_RET(code); } +int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; + SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + msg->output = output; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(msg); + + CTG_RET(code); +} + int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); @@ -1367,8 +1392,6 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgFreeDbCache(dbCache); - ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); @@ -1431,7 +1454,8 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD SDBVgInfo* dbInfo = *pDbInfo; if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", + dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1655,21 +1679,21 @@ _return: int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) { bool inCache = false; int32_t code = 0; - SCtgDBCache** dbCache = NULL; + SCtgDBCache* dbCache = NULL; - CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache, &inCache)); SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); if (inCache) { - input.dbId = (*dbCache)->dbId; - input.vgVersion = (*dbCache)->vgInfo->vgVersion; - input.numOfTable = (*dbCache)->vgInfo->numOfTable; + input.dbId = dbCache->dbId; + input.vgVersion = dbCache->vgInfo->vgVersion; + input.numOfTable = dbCache->vgInfo->numOfTable; - ctgReleaseVgInfo(*dbCache); - ctgReleaseDBCache(pCtg, *dbCache); + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); } else { input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } @@ -1717,7 +1741,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) -int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput) { +int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput, bool syncReq) { if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1729,7 +1753,6 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); } - SCtgUpdateTblMsg *msg = NULL; STableMetaOutput moutput = {0}; STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput)); if (NULL == output) { @@ -1801,19 +1824,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); } - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; - msg = malloc(sizeof(SCtgUpdateTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - msg->output = output; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, syncReq)); return TSDB_CODE_SUCCESS; @@ -1821,7 +1832,6 @@ _return: tfree(output->tbMeta); tfree(output); - tfree(msg); CTG_RET(code); } @@ -1862,7 +1872,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons while (true) { - CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output)); + CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false)); if (CTG_IS_META_TABLE(output->metaType)) { *pTableMeta = output->tbMeta; @@ -2155,6 +2165,82 @@ int32_t ctgStartUpdateThread() { return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) { + STableMeta *tbMeta = NULL; + int32_t code = 0; + SVgroupInfo vgroupInfo = {0}; + SCtgDBCache* dbCache = NULL; + SArray *vgList = NULL; + SDBVgInfo *vgInfo = NULL; + + *pVgList = NULL; + + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB)); + + char db[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, db); + + SHashObj *vgHash = NULL; + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); + + if (dbCache) { + vgHash = dbCache->vgInfo->vgHash; + } else { + vgHash = vgInfo->vgHash; + } + + if (tbMeta->tableType == TSDB_SUPER_TABLE) { + CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); + } else { + // USE HASH METHOD INSTEAD OF VGID IN TBMETA + ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType); + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + +#if 0 + int32_t vgId = tbMeta->vgId; + if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { + ctgWarn("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); + CTG_ERR_JRET(TSDB_CODE_CTG_VG_META_MISMATCH); + } + + vgList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == vgList) { + ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL == taosArrayPush(vgList, &vgroupInfo)) { + ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + *pVgList = vgList; + vgList = NULL; +#endif + } + +_return: + + if (dbCache) { + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } + + tfree(tbMeta); + + if (vgInfo) { + taosHashCleanup(vgInfo->vgHash); + tfree(vgInfo); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } + + CTG_RET(code); +} + int32_t catalogInit(SCatalogCfg *cfg) { if (gCtgMgmt.pCluster) { @@ -2502,19 +2588,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta)); - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; - SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - msg->output = output; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output)); CTG_API_LEAVE(code); @@ -2522,7 +2596,6 @@ _return: tfree(output->tbMeta); tfree(output); - tfree(msg); CTG_API_LEAVE(code); } @@ -2544,7 +2617,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL)); + CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, false)); } int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { @@ -2564,83 +2637,28 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - - STableMeta *tbMeta = NULL; + int32_t code = 0; - SVgroupInfo vgroupInfo = {0}; - SCtgDBCache* dbCache = NULL; - SArray *vgList = NULL; - SDBVgInfo *vgInfo = NULL; - *pVgList = NULL; - - CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB)); + while (true) { + code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList); + if (code) { + if (TSDB_CODE_CTG_VG_META_MISMATCH == code) { + CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true)); - char db[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, db); - - SHashObj *vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); - - if (dbCache) { - vgHash = dbCache->vgInfo->vgHash; - } else { - vgHash = vgInfo->vgHash; - } - - /* TODO REMOEV THIS .... - if (0 == tbMeta->vgId) { - SVgroupInfo vgroup = {0}; - - catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup); - - tbMeta->vgId = vgroup.vgId; - } - // TODO REMOVE THIS ....*/ - - if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); - } else { - int32_t vgId = tbMeta->vgId; - if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { - ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); + CTG_ERR_JRET(ctgRefreshDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName)); + + continue; + } } - vgList = taosArrayInit(1, sizeof(SVgroupInfo)); - if (NULL == vgList) { - ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL == taosArrayPush(vgList, &vgroupInfo)) { - ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - *pVgList = vgList; - vgList = NULL; + break; } _return: - if (dbCache) { - ctgReleaseVgInfo(dbCache); - ctgReleaseDBCache(pCtg, dbCache); - } - - tfree(tbMeta); - - if (vgInfo) { - taosHashCleanup(vgInfo->vgHash); - tfree(vgInfo); - } - - if (vgList) { - taosArrayDestroy(vgList); - vgList = NULL; - } - CTG_API_LEAVE(code); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index fc9c36097c..a93a07648a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -415,7 +415,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limi TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") // tfs -TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount") TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount") @@ -433,6 +433,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error" TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") From 9cbe40aef718cf76a5a07577ab5c7399fc93b9d3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 11 Mar 2022 13:20:26 +0800 Subject: [PATCH 4/5] feature/scheduler --- source/client/src/clientImpl.c | 4 ++++ source/libs/catalog/src/catalog.c | 8 ++++++-- source/libs/qcom/src/querymsg.c | 6 ++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7a3f33a999..76c4c1dbcd 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -336,6 +336,10 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { } } } + + if (!quit) { + destroyRequest(pRequest); + } } if (code) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 8d5fa24fa2..24358f2c06 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1452,8 +1452,12 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) { int32_t code = 0; SDBVgInfo* dbInfo = *pDbInfo; + + if (NULL == dbInfo->vgHash) { + return TSDB_CODE_SUCCESS; + } - if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { + if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); @@ -2588,7 +2592,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta)); - CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output)); + CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, false)); CTG_API_LEAVE(code); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 37e8b7302e..8dd9f33c8a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -27,6 +27,7 @@ int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN); pOut->dbId = usedbRsp->uid; + pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo)); if (NULL == pOut->dbVgroup) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -34,6 +35,11 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; pOut->dbVgroup->hashMethod = usedbRsp->hashMethod; + + if (usedbRsp->vgNum <= 0) { + return TSDB_CODE_SUCCESS; + } + pOut->dbVgroup->vgHash = taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { From 732a0b2c6b720002628ca429e071eea697c319d3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 11 Mar 2022 15:59:28 +0800 Subject: [PATCH 5/5] feature/scheduler --- include/libs/catalog/catalog.h | 3 +- source/dnode/mnode/impl/src/mndDb.c | 22 ++++- source/dnode/vnode/src/vnd/vnodeQuery.c | 14 ++- source/libs/catalog/inc/catalogInt.h | 1 + source/libs/catalog/src/catalog.c | 106 ++++++++++++---------- source/libs/catalog/test/catalogTests.cpp | 4 +- source/libs/parser/src/parTranslater.c | 2 +- source/libs/scheduler/src/scheduler.c | 18 ++-- 8 files changed, 103 insertions(+), 67 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 197040567c..a99a97f547 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -103,11 +103,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pDBName (input, full db name) - * @param forceUpdate (input, force update db vgroup info from mnode) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); +int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList); int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a17a45d46a..c5f2177b34 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -913,12 +913,12 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; - while (vindex < pDb->cfg.numOfVgroups) { + while (true) { SVgObj *pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid == pDb->uid) { + if (NULL == pDb || pVgroup->dbUid == pDb->uid) { SVgroupInfo vgInfo = {0}; vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; @@ -943,6 +943,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { } sdbRelease(pSdb, pVgroup); + + if (pDb && (vindex >= pDb->cfg.numOfVgroups)) { + break; + } } sdbCancelFetch(pSdb, pIter); @@ -964,6 +968,20 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { char *p = strchr(usedbReq.db, '.'); if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); + int32_t vgVersion = taosGetTimestampSec() / 300; + if (usedbReq.vgVersion < vgVersion) { + usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto USE_DB_OVER; + } + + mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); + usedbRsp.vgVersion = vgVersion; + } else { + usedbRsp.vgVersion = usedbReq.vgVersion; + } + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); code = 0; } else { pDb = mndAcquireDb(pMnode, usedbReq.db); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 92b56a54a6..c945b3644c 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -89,6 +89,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { SRpcMsg rpcMsg; int msgLen = 0; int32_t code = TSDB_CODE_VND_APP_ERROR; + char tableFName[TSDB_TABLE_FNAME_LEN]; STableInfoReq infoReq = {0}; if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { @@ -96,6 +97,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } + metaRsp.dbId = pVnode->config.dbId; + memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); + strcpy(metaRsp.tbName, infoReq.tbName); + + sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); + code = vnodeValidateTableHash(&pVnode->config, tableFName); + if (code) { + goto _exit; + } + pTbCfg = metaGetTbInfoByName(pVnode->pMeta, infoReq.tbName, &uid); if (pTbCfg == NULL) { code = TSDB_CODE_VND_TB_NOT_EXIST; @@ -132,9 +143,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - metaRsp.dbId = pVnode->config.dbId; - memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); - strcpy(metaRsp.tbName, infoReq.tbName); if (pTbCfg->type == META_CHILD_TABLE) { strcpy(metaRsp.stbName, pStbCfg->name); metaRsp.suid = pTbCfg->ctbCfg.suid; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 03d78dbf48..83e663bdd7 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -30,6 +30,7 @@ extern "C" { #define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000 #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 +#define CTG_DEFAULT_MAX_RETRY_TIMES 3 #define CTG_RENT_SLOT_SECOND 1.5 diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 24358f2c06..3c12809ba7 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -190,7 +190,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { dbCache = (SCtgDBCache *)pIter; - taosHashGetKey((void **)&dbFName, &len); + dbFName = taosHashGetKey(pIter, &len); int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0; int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0; @@ -384,6 +384,11 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + msg->pCtg = pCtg; strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->dbId = dbId; @@ -466,6 +471,11 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->pCtg = pCtg; msg->dbId = dbId; @@ -493,6 +503,11 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(output->dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + memmove(output->dbFName, p + 1, strlen(p + 1)); + } + msg->pCtg = pCtg; msg->output = output; @@ -562,6 +577,11 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + SCtgDBCache *dbCache = NULL; if (acquire) { dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName)); @@ -927,7 +947,7 @@ int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output); } -int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -977,6 +997,32 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + int32_t code = 0; + int32_t retryNum = 0; + + while (retryNum < CTG_DEFAULT_MAX_RETRY_TIMES) { + code = ctgGetTableMetaFromVnodeImpl(pCtg, pTrans, pMgmtEps, pTableName, vgroupInfo, output); + if (code) { + if (TSDB_CODE_VND_HASH_MISMATCH == code) { + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + + code = catalogRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName); + if (code != TSDB_CODE_SUCCESS) { + break; + } + + ++retryNum; + continue; + } + } + + break; + } + + CTG_RET(code); +} int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { switch (hashMethod) { @@ -1338,16 +1384,12 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - + SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); - if (CTG_IS_INF_DBNAME(dbFName)) { - return TSDB_CODE_SUCCESS; - } - CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId); @@ -1634,13 +1676,13 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { -int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { +int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; int32_t code = 0; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); - if (inCache && !forceUpdate) { + if (inCache) { return TSDB_CODE_SUCCESS; } @@ -1648,13 +1690,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); - if (inCache) { - input.dbId = (*dbCache)->dbId; - input.vgVersion = (*dbCache)->vgInfo->vgVersion; - input.numOfTable = (*dbCache)->vgInfo->numOfTable; - } else { - input.vgVersion = CTG_DEFAULT_INVALID_VERSION; - } + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); if (code) { @@ -1997,11 +2033,6 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) { ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - - char *p = strchr(output->dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { - memmove(output->dbFName, p + 1, strlen(p + 1)); - } CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache)); if (NULL == dbCache) { @@ -2185,7 +2216,7 @@ int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps tNameGetFullDbName(pTableName, db); SHashObj *vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, &dbCache, &vgInfo)); if (dbCache) { vgHash = dbCache->vgInfo->vgHash; @@ -2432,7 +2463,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { +int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { @@ -2444,7 +2475,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c SArray *vgList = NULL; SHashObj *vgHash = NULL; SDBVgInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, &dbCache, &vgInfo)); if (dbCache) { vgHash = dbCache->vgInfo->vgHash; } else { @@ -2478,35 +2509,14 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId int32_t code = 0; if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) { + ctgFreeVgInfo(dbInfo); CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; - SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } + code = ctgPushUpdateVgMsgInQueue(pCtg, dbFName, dbId, dbInfo, false); - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->dbId = dbId; - msg->dbInfo = dbInfo; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); - - dbInfo = NULL; - - CTG_API_LEAVE(code); - _return: - ctgFreeVgInfo(dbInfo); - - tfree(msg); - CTG_API_LEAVE(code); } @@ -2681,7 +2691,7 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pM tNameGetFullDbName(pTableName, db); SDBVgInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, false, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup)); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index cc0e5bb1a9..eace144e0b 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -713,7 +713,7 @@ void *ctgTestGetDbVgroupThread(void *param) { int32_t n = 0; while (!ctgTestStop) { - code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList); if (code) { assert(0); } @@ -2009,7 +2009,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); - code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4be4e25eb8..78c672cfb2 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1115,7 +1115,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) { tNameGetFullDbName(&name, dbFname); SArray* array = NULL; - int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, false, &array); + int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9a69220d7e..6d4d2b393e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -469,7 +469,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (addNum <= 0) { SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } /* @@ -778,8 +778,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); } else { - SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask)); - return TSDB_CODE_SUCCESS; + SCH_TASK_ELOG("task not in executing list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); @@ -1414,6 +1414,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_RET(atomic_load_32(&pJob->errCode)); } + + // NOTE: race condition: the task should be put into the hash table before send msg to server + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); + } SSubplan *plan = pTask->plan; @@ -1429,12 +1435,6 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); - // NOTE: race condition: the task should be put into the hash table before send msg to server - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); - } - if (SCH_IS_QUERY_JOB(pJob)) { SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); }