From 763eccf8aa4e0e6c4526142a68c2fb1cf76aa9c5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 10 Mar 2022 19:05:58 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 7 + include/common/tname.h | 4 +- include/libs/catalog/catalog.h | 26 +- include/libs/executor/executor.h | 4 +- include/libs/nodes/plannodes.h | 4 +- include/libs/qcom/query.h | 12 + include/libs/scheduler/scheduler.h | 9 +- include/util/taoserror.h | 1 + source/client/inc/clientInt.h | 1 + source/client/src/clientImpl.c | 102 ++++- source/common/src/tmsg.c | 1 + source/common/src/tname.c | 30 +- source/dnode/vnode/inc/vnode.h | 3 + source/dnode/vnode/src/vnd/vnodeCfg.c | 21 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- source/libs/catalog/inc/catalogInt.h | 27 +- source/libs/catalog/src/catalog.c | 401 ++++++++++++-------- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 4 +- source/libs/executor/src/executorimpl.c | 43 ++- source/libs/executor/test/executorTests.cpp | 2 +- source/libs/qcom/src/queryUtil.c | 14 + source/libs/qworker/inc/qworkerMsg.h | 2 +- source/libs/qworker/src/qworker.c | 7 +- source/libs/qworker/src/qworkerMsg.c | 16 +- source/libs/qworker/test/qworkerTests.cpp | 2 +- source/libs/scheduler/inc/schedulerInt.h | 18 +- source/libs/scheduler/src/schFlowCtrl.c | 2 +- source/libs/scheduler/src/scheduler.c | 95 +++-- source/util/src/terror.c | 1 + 31 files changed, 600 insertions(+), 265 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c6ae35c463..c114457b6b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -24,6 +24,7 @@ #include "thash.h" #include "tlist.h" #include "trow.h" +#include "tname.h" #ifdef __cplusplus extern "C" { @@ -459,8 +460,14 @@ typedef struct { typedef struct { int32_t code; + SName tableName; } SQueryTableRsp; +int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + +int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); + + typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroups; diff --git a/include/common/tname.h b/include/common/tname.h index 6de38a68ee..ffa4f8f253 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -17,7 +17,6 @@ #define _TD_COMMON_NAME_H_ #include "tdef.h" -#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -61,7 +60,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type); int32_t tNameSetAcctId(SName* dst, int32_t acctId); -SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); +bool tNameDBNameEqual(SName* left, SName* right); + #ifdef __cplusplus } diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f217277b80..197040567c 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -120,7 +120,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ @@ -131,7 +131,7 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ @@ -140,28 +140,38 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpS int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); +/** + * Force refresh DB's local cached vgroup info. + * @param pCtg (input, got with catalogGetHandle) + * @param pTrans (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param dbFName (input, db full name) + * @return error code + */ +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName); + /** * Force refresh a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); +int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); /** * Force refresh a table's local cached meta data and get the new one. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); +int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); @@ -170,7 +180,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ @@ -181,7 +191,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) + * @param pTableName (input, table name) * @param vgInfo (output, vgroup info) * @return error code */ diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d4af51fc21..e1729835de 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tcommon.h" +#include "query.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -30,6 +31,7 @@ struct SSubplan; typedef struct SReadHandle { void* reader; void* meta; + void* config; } SReadHandle; /** @@ -67,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA * @param qId * @return */ -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c805eba320..d1bbbe7dbc 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -90,7 +90,7 @@ typedef struct SSubLogicPlan { } SSubLogicPlan; typedef struct SQueryLogicPlan { - ENodeType type;; + ENodeType type; SNodeList* pSubplans; } SQueryLogicPlan; @@ -127,7 +127,7 @@ typedef struct SScanPhysiNode { int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t count; // repeat count int32_t reverse; // reverse scan count - char tableName[TSDB_TABLE_NAME_LEN]; + SName tableName; } SScanPhysiNode; typedef SScanPhysiNode SSystemTableScanPhysiNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 07c5c1e499..af6077208a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -138,6 +138,11 @@ typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; +typedef struct SQueryErrorInfo { + int32_t code; + SName tableName; +} SQueryErrorInfo; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); @@ -170,6 +175,9 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta); +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); + + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); @@ -178,6 +186,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE +#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH) +#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) + + #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); }} while(0) #define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); }} while(0) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 56da9ece6f..2d4cbd4ac0 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -52,10 +52,11 @@ typedef struct SQueryProfileSummary { } SQueryProfileSummary; typedef struct SQueryResult { - int32_t code; - uint64_t numOfRows; - int32_t msgSize; - char *msg; + int32_t code; + SArray *errList; // SArray + uint64_t numOfRows; + int32_t msgSize; + char *msg; } SQueryResult; typedef struct STaskInfo { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5d42646238..0e80ebdb75 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -326,6 +326,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) #define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) +#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 321e8ab77b..4a49c0f528 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -184,6 +184,7 @@ typedef struct SRequestObj { char* msgBuf; void* pInfo; // sql parse info, generated by parser module int32_t code; + SArray* errList; // SArray SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 85d98e802f..30ffaaee41 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -223,6 +223,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } + pRequest->errList = res.errList; pRequest->code = code; return pRequest->code; } @@ -234,19 +235,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } } - + + pRequest->errList = res.errList; pRequest->code = res.code; return pRequest->code; } -TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { - STscObj* pTscObj = (STscObj*)taos; - if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); - terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - return NULL; - } - +SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; SQuery* pQuery; SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); @@ -273,6 +268,93 @@ _return: return pRequest; } +int32_t clientProcessErrorList(SArray **pList) { + SArray *errList = *pList; + int32_t errNum = (int32_t)taosArrayGetSize(errList); + + for (int32_t i = 0; i < errNum; ++i) { + SQueryErrorInfo *errInfo = taosArrayGet(errList, i); + if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + if (i == (errNum - 1)) { + break; + } + + // TODO REMOVE SAME DB ERROR + } else { + taosArrayRemove(errList, i); + --i; + --errNum; + } + } + + if (0 == errNum) { + taosArrayDestroy(*pList); + *pList = NULL; + } + + return TSDB_CODE_SUCCESS; +} + + +SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { + SRequestObj* pRequest = NULL; + int32_t code = 0; + bool quit = false; + + while (!quit) { + pRequest = execQueryImpl(pTscObj, sql, sqlLen); + if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) { + break; + } + + code = clientProcessErrorList(&pRequest->errList); + if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) { + break; + } + + int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList); + for (int32_t i = 0; i < errNum; ++i) { + SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i); + + if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + quit = true; + break; + } + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&errInfo->tableName, dbFName); + + code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); + if (code != TSDB_CODE_SUCCESS) { + quit = true; + break; + } + } + } + } + + if (code) { + pRequest->code = code; + } + + return pRequest; +} + +TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { + tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + return NULL; + } + + return execQuery(pTscObj, sql, sqlLen); +} + int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; @@ -389,7 +471,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { - return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; + return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f80bb3d8bd..7a46261684 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2655,3 +2655,4 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { return buf; } + \ No newline at end of file diff --git a/source/common/src/tname.c b/source/common/src/tname.c index fb77417cac..d08865714c 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -222,6 +222,27 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId) { return 0; } +bool tNameDBNameEqual(SName* left, SName* right) { + if (NULL == left) { + if (NULL == right) { + return true; + } + + return false; + } + + if (NULL == right) { + return false; + } + + if (left->acctId != right->acctId) { + return false; + } + + return (0 == strcmp(left->dbname, right->dbname)); +} + + int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { assert(dst != NULL && str != NULL && strlen(str) > 0); @@ -273,13 +294,4 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return 0; } -SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { - SSchema s = {0}; - s.type = type; - s.bytes = bytes; - s.colId = colId; - - tstrncpy(s.name, name, tListLen(s.name)); - return s; -} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 319bd9fde6..2d07a4e6ab 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -193,6 +193,9 @@ void vnodeOptionsInit(SVnodeCfg *pOptions); */ void vnodeOptionsClear(SVnodeCfg *pOptions); +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName); + + /* ------------------------ FOR COMPILE ------------------------ */ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 727a4b41f7..10d0d33722 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -32,4 +32,23 @@ int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) { void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) { memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg)); -} \ No newline at end of file +} + +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName) { + uint32_t hashValue = 0; + + switch (pVnodeOptions->hashMethod) { + default: + hashValue = MurmurHash3_32(tableName, strlen(tableName)); + break; + } + + if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) { + terrno = TSDB_CODE_VND_HASH_MISMATCH; + return TSDB_CODE_VND_HASH_MISMATCH; + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0359483ba2..ac10f78a8a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -30,7 +30,7 @@ void vnodeQueryClose(SVnode *pVnode) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta}; + SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; switch (pMsg->msgType) { case TDMT_VND_QUERY:{ diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c4f1a117fe..03d78dbf48 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -159,8 +159,10 @@ typedef struct SCtgRemoveTblMsg { typedef struct SCtgMetaAction { - int32_t act; - void *data; + int32_t act; + void *data; + bool syncReq; + uint64_t seqId; } SCtgMetaAction; typedef struct SCtgQNode { @@ -168,14 +170,21 @@ typedef struct SCtgQNode { struct SCtgQNode *next; } SCtgQNode; +typedef struct SCtgQueue { + SRWLatch qlock; + uint64_t seqId; + uint64_t seqDone; + SCtgQNode *head; + SCtgQNode *tail; + tsem_t reqSem; + tsem_t rspSem; + uint64_t qRemainNum; +} SCtgQueue; + typedef struct SCatalogMgmt { bool exit; SRWLatch lock; - SRWLatch qlock; - SCtgQNode *head; - SCtgQNode *tail; - tsem_t sem; - uint64_t qRemainNum; + SCtgQueue queue; pthread_t updateThread; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; @@ -191,8 +200,8 @@ typedef struct SCtgAction { ctgActFunc func; } SCtgAction; -#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1) -#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1) +#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) +#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) #define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1) #define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 71a4bb7cef..fb378928b6 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -229,44 +229,6 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) { } - -void ctgPopAction(SCtgMetaAction **action) { - SCtgQNode *orig = gCtgMgmt.head; - - SCtgQNode *node = gCtgMgmt.head->next; - gCtgMgmt.head = gCtgMgmt.head->next; - - CTG_QUEUE_SUB(); - - tfree(orig); - - *action = &node->action; -} - - -int32_t ctgPushAction(SCtgMetaAction *action) { - SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); - if (NULL == node) { - qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); - CTG_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - node->action = *action; - - CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock); - gCtgMgmt.tail->next = node; - gCtgMgmt.tail = node; - CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock); - - CTG_QUEUE_ADD(); - CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum); - - tsem_post(&gCtgMgmt.sem); - - return TSDB_CODE_SUCCESS; -} - - void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { return; @@ -284,94 +246,6 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { } -int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; - SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->dbId = dbId; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - -int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; - SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->stbName, stbName, sizeof(msg->stbName)); - msg->dbId = dbId; - msg->suid = suid; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - - -int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { - int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; - SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->tbName, tbName, sizeof(msg->tbName)); - msg->dbId = dbId; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - - return TSDB_CODE_SUCCESS; - -_return: - - tfree(action.data); - CTG_RET(code); -} - - void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { CTG_LOCK(CTG_WRITE, &cache->stbLock); if (cache->stbCache) { @@ -437,6 +311,180 @@ void ctgFreeHandle(SCatalog* pCtg) { } + +void ctgWaitAction(SCtgMetaAction *action) { + while (true) { + tsem_wait(&gCtgMgmt.queue.rspSem); + + if (atomic_load_8(&gCtgMgmt.exit)) { + tsem_post(&gCtgMgmt.queue.rspSem); + break; + } + + if (gCtgMgmt.queue.seqDone >= action->seqId) { + break; + } + + tsem_post(&gCtgMgmt.queue.rspSem); + sched_yield(); + } +} + +void ctgPopAction(SCtgMetaAction **action) { + SCtgQNode *orig = gCtgMgmt.queue.head; + + SCtgQNode *node = gCtgMgmt.queue.head->next; + gCtgMgmt.queue.head = gCtgMgmt.queue.head->next; + + CTG_QUEUE_SUB(); + + tfree(orig); + + *action = &node->action; +} + + +int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) { + SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); + if (NULL == node) { + qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + CTG_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + action->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); + + node->action = *action; + + CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + gCtgMgmt.queue.tail->next = node; + gCtgMgmt.queue.tail = node; + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + + CTG_QUEUE_ADD(); + CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum); + + tsem_post(&gCtgMgmt.queue.reqSem); + + ctgDebug("action [%s] added into queue", gCtgAction[action->act].name); + + if (action->syncReq) { + ctgWaitAction(action); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; + SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + +int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; + SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->stbName, stbName, sizeof(msg->stbName)); + msg->dbId = dbId; + msg->suid = suid; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + + + +int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; + SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->tbName, tbName, sizeof(msg->tbName)); + msg->dbId = dbId; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(action.data); + CTG_RET(code); +} + +int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG, .syncReq = syncReq}; + SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); + ctgFreeVgInfo(dbInfo); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->pCtg = pCtg; + msg->dbId = dbId; + msg->dbInfo = dbInfo; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeVgInfo(dbInfo); + tfree(action.data); + CTG_RET(code); +} + + int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); @@ -1591,38 +1639,57 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const } CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); - - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; - SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); - ctgFreeVgInfo(DbOut.dbVgroup); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->pCtg = pCtg; - msg->dbId = DbOut.dbId; - msg->dbInfo = DbOut.dbVgroup; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false)); return TSDB_CODE_SUCCESS; _return: tfree(*pInfo); - tfree(msg); - *pInfo = DbOut.dbVgroup; CTG_RET(code); } +int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) { + bool inCache = false; + int32_t code = 0; + SCtgDBCache** dbCache = NULL; + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + + SUseDbOutput DbOut = {0}; + SBuildUseDBInput input = {0}; + tstrncpy(input.db, dbFName, tListLen(input.db)); + + if (inCache) { + input.dbId = (*dbCache)->dbId; + input.vgVersion = (*dbCache)->vgInfo->vgVersion; + input.numOfTable = (*dbCache)->vgInfo->numOfTable; + + ctgReleaseVgInfo(*dbCache); + ctgReleaseDBCache(pCtg, *dbCache); + } else { + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + } + + code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); + if (code) { + if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) { + ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId); + ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId); + } + + CTG_ERR_RET(code); + } + + CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) { *pOutput = malloc(sizeof(STableMetaOutput)); @@ -1746,9 +1813,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); return TSDB_CODE_SUCCESS; @@ -2042,9 +2107,10 @@ void* ctgUpdateThreadFunc(void* param) { CTG_LOCK(CTG_READ, &gCtgMgmt.lock); while (true) { - tsem_wait(&gCtgMgmt.sem); + tsem_wait(&gCtgMgmt.queue.reqSem); if (atomic_load_8(&gCtgMgmt.exit)) { + tsem_post(&gCtgMgmt.queue.rspSem); break; } @@ -2056,6 +2122,12 @@ void* ctgUpdateThreadFunc(void* param) { (*gCtgAction[action->act].func)(action); + gCtgMgmt.queue.seqDone = action->seqId; + + if (action->syncReq) { + tsem_post(&gCtgMgmt.queue.rspSem); + } + CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); ctgDbgShowClusterCache(pCtg); @@ -2125,14 +2197,15 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(ctgStartUpdateThread()); - tsem_init(&gCtgMgmt.sem, 0, 0); + tsem_init(&gCtgMgmt.queue.reqSem, 0, 0); + tsem_init(&gCtgMgmt.queue.rspSem, 0, 0); - gCtgMgmt.head = calloc(1, sizeof(SCtgQNode)); - if (NULL == gCtgMgmt.head) { + gCtgMgmt.queue.head = calloc(1, sizeof(SCtgQNode)); + if (NULL == gCtgMgmt.queue.head) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - gCtgMgmt.tail = gCtgMgmt.head; + gCtgMgmt.queue.tail = gCtgMgmt.queue.head; qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); @@ -2332,12 +2405,10 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); dbInfo = NULL; - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); - CTG_API_LEAVE(code); _return: @@ -2443,9 +2514,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { action.data = msg; - CTG_ERR_JRET(ctgPushAction(&action)); - - ctgDebug("action [%s] added into queue", gCtgAction[action.act].name); + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); CTG_API_LEAVE(code); @@ -2458,6 +2527,15 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName)); +} int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { CTG_API_ENTER(); @@ -2702,7 +2780,8 @@ void catalogDestroy(void) { atomic_store_8(&gCtgMgmt.exit, true); - tsem_post(&gCtgMgmt.sem); + tsem_post(&gCtgMgmt.queue.reqSem); + tsem_post(&gCtgMgmt.queue.rspSem); while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { usleep(1); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d46978b26d..43e6adf591 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -721,7 +721,7 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a8602b7c77..ce50298add 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index fabaa2d31d..1684a6e936 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) { qDestroyTask(*handle); } -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, errInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7e46a7a63d..480114480c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8072,7 +8072,8 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { +int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { + int32_t code = 0; if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); } @@ -8081,11 +8082,21 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; + char tableFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&pScanPhyNode->tableName, tableFName); + + code = vnodeValidateTableHash(pHandle->config, tableFName); + if (code) { + errInfo->code = code; + errInfo->tableName = pScanPhyNode->tableName; + return code; + } + size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + pTaskInfo->pRoot = createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { // SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode; // return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); @@ -8093,7 +8104,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + if (code) { + return code; + } + SArray* idList = NULL; if (groupInfo.numOfTables > 0) { @@ -8126,12 +8141,15 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + code = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); + if (code) { + return code; + } int32_t resultRowSize = 0; SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode, &resultRowSize); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); + pTaskInfo->pRoot = createAggregateOperatorInfo(pTaskInfo->pRoot, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); @@ -8143,6 +8161,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ + + if (pTaskInfo->pRoot == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + return code; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) { @@ -8205,7 +8229,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead return NULL; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) { +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; @@ -8216,9 +8240,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); - if ((*pTaskInfo)->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; + code = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); + if (code) { goto _complete; } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index dcb1774ad0..d6414a908c 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -946,7 +946,7 @@ TEST(testCase, build_executor_tree_Test) { int32_t code = qStringToSubplan(msg, &plan); ASSERT_EQ(code, 0); - code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle); + code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle, NULL); ASSERT_EQ(code, 0); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 63fbf59c06..543a908226 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -161,3 +161,17 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; } + + +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { + SSchema s = {0}; + s.type = type; + s.bytes = bytes; + s.colId = colId; + + tstrncpy(s.name, name, tListLen(s.name)); + return s; +} + + + diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index ecb5dbd654..f8d8ce4563 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b9ef6f8504..ea8e818863 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -998,6 +998,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; + SQueryErrorInfo errInfo = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); @@ -1019,7 +1020,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); + code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle, &errInfo); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); @@ -1032,7 +1033,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { //TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code, NULL)); QW_TASK_DLOG("query msg rsped, code:%d", code); queryRsped = true; @@ -1051,7 +1052,7 @@ _return: } if (!queryRsped) { - qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); + qwBuildAndSendQueryRsp(qwMsg->connection, rspCode, &errInfo); QW_TASK_DLOG("query msg rsped, code:%x", rspCode); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 7d633d1c73..ce39c710c8 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -44,17 +44,23 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo) { SRpcMsg *pMsg = (SRpcMsg *)connection; - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; + SQueryTableRsp rsp = {.code = code}; + if (errInfo && errInfo->code) { + rsp.tableName = errInfo->tableName; + } + + int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); + void *msg = rpcMallocCont(contLen); + tSerializeSQueryTableRsp(msg, contLen, &rsp); SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pRsp, - .contLen = sizeof(*pRsp), + .pCont = msg, + .contLen = contLen, .code = code, }; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 231f0c7fff..a1e3f82d4c 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -262,7 +262,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { return; } -int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) { int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum); qwtTestSinkBlockNum = 0; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d9c2ae6d65..146c7c1966 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -136,6 +136,7 @@ typedef struct SSchJob { uint64_t queryId; SSchJobAttr attr; int32_t levelNum; + int32_t taskNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr SArray *levels; // Element is SQueryLevel, starting from 0. SArray @@ -154,7 +155,8 @@ typedef struct SSchJob { int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - void *res; //TODO free it or not + SArray *errList; // SArray + void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; SQueryProfileSummary summary; @@ -168,9 +170,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) #define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) -#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) -#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) -#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) +#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) +#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) +#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -180,12 +182,14 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) -#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) -#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) +#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse]) #define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps) @@ -219,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 9fba6523b6..4a2173561f 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -259,7 +259,7 @@ _return: SCH_UNLOCK(SCH_WRITE, &ctrl->lock); if (code) { - code = schProcessOnTaskFailure(pJob, pTask, code); + code = schProcessOnTaskFailure(pJob, pTask, code, NULL); } SCH_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 63c634a384..02eb8869e4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -410,6 +410,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + ++pJob->taskNum; } SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); @@ -588,7 +590,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; //TODO CHECK epList/condidateList - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); @@ -611,7 +613,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { ++pTask->candidateIdx; @@ -727,8 +729,32 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } +int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *errInfo) { + if (NULL == errInfo || !SCH_IS_DATA_SRC_TASK(pTask) || !IS_CLIENT_RETRY_ERROR(errInfo->code)) { + return TSDB_CODE_SUCCESS; + } + + if (NULL == pJob->errList) { + SSchLevel *level = taosArrayGetLast(pJob->levels); + + pJob->errList = taosArrayInit(level->taskNum, sizeof(SQueryErrorInfo)); + if (NULL == pJob->errList) { + SCH_TASK_ELOG("taosArrayInit %d errInfofailed", pJob->taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(pJob->errList, errInfo)) { + SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + + // Note: no more task error processing, handled in function internal -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo) { int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { @@ -757,8 +783,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); + + SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errInfo)); - if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskFailed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -801,7 +829,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (parentNum == 0) { int32_t taskDone = 0; - if (SCH_TASK_NEED_WAIT_ALL(pTask)) { + if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskSucceed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -870,11 +898,11 @@ int32_t schFetchFromRemote(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - void *res = atomic_load_ptr(&pJob->res); - if (res) { + void *resData = atomic_load_ptr(&pJob->resData); + if (resData) { atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_JOB_DLOG("res already fetched, res:%p", res); + SCH_JOB_DLOG("res already fetched, res:%p", resData); return TSDB_CODE_SUCCESS; } @@ -886,7 +914,7 @@ _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL)); } @@ -894,6 +922,8 @@ _return: int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; int8_t status = 0; + bool errInfoGot = false; + SQueryErrorInfo errInfo = {0}; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); @@ -933,13 +963,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } case TDMT_VND_QUERY_RSP: { - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + SQueryTableRsp rsp = {0}; + if (msg) { + tDeserializeSQueryTableRsp(msg, msgSize, &rsp); + if (rsp.code) { + errInfo.code = rsp.code; + errInfo.tableName = rsp.tableName; + errInfoGot = true; + } + + SCH_ERR_JRET(rsp.code); + } SCH_ERR_JRET(rspCode); + if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); @@ -966,13 +1006,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - if (pJob->res) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + if (pJob->resData) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData); tfree(rsp); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_store_ptr(&pJob->res, rsp); + atomic_store_ptr(&pJob->resData, rsp); atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); if (rsp->completed) { @@ -999,7 +1039,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? &errInfo : NULL)); } @@ -1423,7 +1463,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, NULL)); } int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { @@ -1474,13 +1514,15 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { } void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { + if (!SCH_IS_NEED_DROP_JOB(pJob)) { + return; + } + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; - if (!SCH_TASK_NO_NEED_DROP(pTask)) { - schDropTaskOnExecutedNode(pJob, pTask); - } + schDropTaskOnExecutedNode(pJob, pTask); pIter = taosHashIterate(list, pIter); } @@ -1537,8 +1579,9 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); - - tfree(pJob->res); + taosArrayDestroy(pJob->errList); + + tfree(pJob->resData); tfree(pJob); @@ -1673,8 +1716,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); SSchJob *job = schAcquireJob(*pJob); + pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; + pRes->errList = job->errList; + job->errList = NULL; + schReleaseJob(*pJob); return TSDB_CODE_SUCCESS; @@ -1862,14 +1909,14 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } - if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { + if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } while (true) { - *pData = atomic_load_ptr(&pJob->res); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { + *pData = atomic_load_ptr(&pJob->resData); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { continue; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c1cb4f8a41..fc9c36097c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -324,6 +324,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")