diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c56824f567..91710c4633 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,6 +140,8 @@ typedef enum _mgmt_table { #define TSDB_KILL_MSG_LEN 30 +#define TSDB_TABLE_NUM_UNIT 100000 + #define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) @@ -169,6 +171,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SBuildUseDBInput; typedef struct SField { @@ -570,6 +573,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -800,8 +804,10 @@ typedef struct SVgroupInfo { uint32_t hashBegin; uint32_t hashEnd; SEpSet epset; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SVgroupInfo; + typedef struct { int32_t numOfVgroups; SVgroupInfo vgroups[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index c6183780f9..8db7d34d3e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -74,9 +74,9 @@ typedef struct SDbVgVersion { char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; - int32_t catalogInit(SCatalogCfg *cfg); /** @@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); */ void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId); +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum); /** * Get a DB's all vgroup info. diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e6351c5f43..c805eba320 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -202,6 +202,7 @@ typedef struct SSubplan { int32_t msgType; // message type for subplan, used to denote the send message type to vnode. int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node + SQueryNodeStat execNodeStat; // only for scan subplan SNodeList* pChildren; // the datasource subplan,from which to fetch the result SNodeList* pParents; // the data destination subplan, get data from current subplan SPhysiNode* pNode; // physical plan of current subplan diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1f56254476..1a95d26729 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -35,7 +35,6 @@ enum { JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_DROPPING, - JOB_TASK_STATUS_FREEING, }; enum { @@ -83,6 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgInfo { int32_t vgVersion; int8_t hashMethod; + int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT SHashObj *vgHash; //key:vgId, value:SVgroupInfo } SDBVgInfo; @@ -133,6 +133,10 @@ typedef struct SQueryNodeAddr { SEpSet epset; } SQueryNodeAddr; +typedef struct SQueryNodeStat { + int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT +} SQueryNodeStat; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 518cabb636..56da9ece6f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { uint32_t maxJobNum; + int32_t maxNodeTableNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b1e9de8000..1128a38840 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -224,6 +224,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl SDbVgVersion *db = &dbs[i]; db->dbId = htobe64(db->dbId); db->vgVersion = htonl(db->vgVersion); + db->numOfTable = htonl(db->numOfTable); } SKv kv = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ae31dff310..8b4f572fee 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1423,6 +1423,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1438,6 +1439,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -1482,6 +1484,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1; + if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; } return 0; @@ -1542,6 +1545,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1; if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1; + if (tDecodeI32(pDecoder, &vgInfo.numOfTable) < 0) return -1; taosArrayPush(pRsp->pVgroupInfos, &vgInfo); } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 974f5fc982..1b75d04b44 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -885,6 +885,29 @@ DROP_DB_OVER: return code; } +void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { + int32_t vindex = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (vindex < pDb->cfg.numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + *num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; + + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + + sdbCancelFetch(pSdb, pIter); +} + + static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; @@ -900,6 +923,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; vgInfo.hashEnd = pVgroup->hashEnd; + vgInfo.numOfTable = pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; vgInfo.epset.numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; @@ -967,7 +991,10 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { goto USE_DB_OVER; } - if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) { + int32_t numOfTable = 0; + mndGetDBTableNum(pDb, pMnode, &numOfTable); + + if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) { mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); } @@ -1017,6 +1044,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, SDbVgVersion *pDbVgVersion = &pDbs[i]; pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId); pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion); + pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable); SUseDbRsp usedbRsp = {0}; @@ -1027,28 +1055,34 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, usedbRsp.uid = pDbVgVersion->dbId; usedbRsp.vgVersion = -1; taosArrayPush(batchUseRsp.pArray, &usedbRsp); - } else if (pDbVgVersion->vgVersion >= pDb->vgVersion) { - mDebug("db:%s, version not changed", pDbVgVersion->dbFName); + continue; + } + + int32_t numOfTable = 0; + mndGetDBTableNum(pDb, pMnode, &numOfTable); + + if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { + mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); mndReleaseDb(pMnode, pDb); continue; - } else { - usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); - if (usedbRsp.pVgroupInfos == NULL) { - mndReleaseDb(pMnode, pDb); - mError("db:%s, failed to malloc usedb response", pDb->name); - continue; - } - - mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); - memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); - usedbRsp.uid = pDb->uid; - usedbRsp.vgVersion = pDb->vgVersion; - usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); - usedbRsp.hashMethod = pDb->hashMethod; - - taosArrayPush(batchUseRsp.pArray, &usedbRsp); - mndReleaseDb(pMnode, pDb); } + + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + mndReleaseDb(pMnode, pDb); + mError("db:%s, failed to malloc usedb response", pDb->name); + continue; + } + + mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); + memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + usedbRsp.uid = pDb->uid; + usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.hashMethod = pDb->hashMethod; + + taosArrayPush(batchUseRsp.pArray, &usedbRsp); + mndReleaseDb(pMnode, pDb); } int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 56c0ec1130..03395b1f51 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -965,7 +965,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } -int32_t ctgStbVersionCompare(const void* key1, const void* key2) { +int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) { @@ -975,7 +975,7 @@ int32_t ctgStbVersionCompare(const void* key1, const void* key2) { } } -int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { +int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) { if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { @@ -985,6 +985,27 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { } } +int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; + } +} + + int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; @@ -1034,7 +1055,7 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) { +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; @@ -1048,12 +1069,12 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si if (slot->needSort) { qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); } - void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); + void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); if (NULL == orig) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1075,8 +1096,8 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { - int16_t widx = labs(id % mgmt->slotNum); +int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { + int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; int32_t code = 0; @@ -1088,12 +1109,12 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) } if (slot->needSort) { - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } - int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); if (idx < 0) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1240,7 +1261,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { uint64_t *suid = NULL; suid = taosHashGetKey(pIter, NULL); - if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) { + if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); } @@ -1264,7 +1285,7 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); @@ -1331,7 +1352,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD } bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache)); @@ -1344,8 +1365,15 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); if (dbCache->vgInfo) { - if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) { + ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + ctgWReleaseVgInfo(dbCache); + + return TSDB_CODE_SUCCESS; + } + + if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) { + ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgWReleaseVgInfo(dbCache); return TSDB_CODE_SUCCESS; @@ -1365,7 +1393,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD dbCache = NULL; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); CTG_RET(code); } @@ -1397,8 +1425,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); - - ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare); + + ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare); } origSuid = orig->suid; @@ -1511,6 +1539,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const if (inCache) { input.dbId = (*dbCache)->dbId; input.vgVersion = (*dbCache)->vgInfo->vgVersion; + input.numOfTable = (*dbCache)->vgInfo->numOfTable; } else { input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } @@ -1924,7 +1953,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) { ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare)); + CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -2163,7 +2192,7 @@ void catalogFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) { +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) { @@ -2194,6 +2223,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers *version = dbCache->vgInfo->vgVersion; *dbId = dbCache->dbId; + *tableNum = dbCache->vgInfo->numOfTable; ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); diff --git a/source/libs/parser/src/astTranslate.c b/source/libs/parser/src/astTranslate.c index 8f06560117..906d333314 100644 --- a/source/libs/parser/src/astTranslate.c +++ b/source/libs/parser/src/astTranslate.c @@ -893,6 +893,8 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p SUseDbReq usedbReq = {0}; tNameExtractFullName(&name, usedbReq.db); + catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo)); if (NULL== pCxt->pCmdMsg) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 2a52e01dc1..ab0bbd319a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); + pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -84,6 +85,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms usedbReq.db[sizeof(usedbReq.db) - 1] = 0; usedbReq.vgVersion = pInput->vgVersion; usedbReq.dbId = pInput->dbId; + usedbReq.numOfTable = pInput->numOfTable; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = rpcMallocCont(bufLen); @@ -247,7 +249,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { PROCESS_META_OVER: if (code != 0) { - qError("failed to process table meta rsp since %s", terrstr()); + qError("failed to process table meta rsp since %s", tstrerror(code)); } tFreeSTableMetaRsp(&metaRsp); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f9259fd645..12c4f824ac 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -26,8 +26,10 @@ extern "C" { #include "scheduler.h" #include "thash.h" -#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 -#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 +#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT + #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -72,17 +74,27 @@ typedef struct SSchCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; + SEpSet epSet; } SSchCallbackParam; +typedef struct SSchFlowControl { + SRWLatch lock; + bool sorted; + int32_t tableNumSum; + uint32_t execTaskNum; + SArray *taskList; // Element is SSchTask* +} SSchFlowControl; + typedef struct SSchLevel { - int32_t level; - int8_t status; - SRWLatch lock; - int32_t taskFailed; - int32_t taskSucceed; - int32_t taskNum; - int32_t taskLaunchIdx; // launch startup index - SArray *subTasks; // Element is SQueryTask + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + int32_t taskLaunchedNum; + SHashObj *flowCtrl; // key is ep, element is SSchFlowControl + SArray *subTasks; // Element is SQueryTask } SSchLevel; typedef struct SSchTask { @@ -102,13 +114,14 @@ typedef struct SSchTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* - void* handle; // task send handle + void* handle; // task send handle } SSchTask; typedef struct SSchJobAttr { bool needFetch; bool syncSchedule; bool queryJob; + bool needFlowCtrl; } SSchJobAttr; typedef struct SSchJob { @@ -140,6 +153,8 @@ typedef struct SSchJob { SQueryProfileSummary summary; } SSchJob; +extern SSchedulerMgmt schMgmt; + #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) @@ -152,8 +167,17 @@ typedef struct SSchJob { #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) -#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY) -#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) +#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true +#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) + +#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) +#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) +#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) +#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse]) +#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) @@ -173,10 +197,19 @@ typedef struct SSchJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -static int32_t schLaunchTask(SSchJob *job, SSchTask *task); -static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); +int32_t schLaunchTask(SSchJob *job, SSchTask *task); +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); +void schFreeFlowCtrl(SSchLevel *pLevel); +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); +int32_t schFetchFromRemote(SSchJob *pJob); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); + #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c new file mode 100644 index 0000000000..9fba6523b6 --- /dev/null +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "schedulerInt.h" +#include "tmsg.h" +#include "query.h" +#include "catalog.h" +#include "tref.h" + +void schFreeFlowCtrl(SSchLevel *pLevel) { + if (NULL == pLevel->flowCtrl) { + return; + } + + SSchFlowControl *ctrl = NULL; + void *pIter = taosHashIterate(pLevel->flowCtrl, NULL); + while (pIter) { + ctrl = (SSchFlowControl *)pIter; + + if (ctrl->taskList) { + taosArrayDestroy(ctrl->taskList); + } + + pIter = taosHashIterate(pLevel->flowCtrl, pIter); + } + + taosHashCleanup(pLevel->flowCtrl); + pLevel->flowCtrl = NULL; +} + +int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { + if (!SCH_IS_QUERY_JOB(pJob)) { + SCH_JOB_DLOG("job no need flow ctrl, queryJob:%d", SCH_IS_QUERY_JOB(pJob)); + return TSDB_CODE_SUCCESS; + } + + int32_t sum = 0; + + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + sum += pTask->plan->execNodeStat.tableNum; + } + + if (sum < schMgmt.cfg.maxNodeTableNum) { + SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); + return TSDB_CODE_SUCCESS; + } + + pLevel->flowCtrl = taosHashInit(pLevel->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pLevel->flowCtrl) { + SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCH_SET_JOB_NEED_FLOW_CTRL(pJob); + + SCH_JOB_DLOG("job NEED flow ctrl, totalTableNum:%d", sum); + + return TSDB_CODE_SUCCESS; +} + +int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { + SSchLevel *pLevel = pTask->level; + SSchFlowControl *ctrl = NULL; + int32_t code = 0; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + if (ctrl->execTaskNum <= 0) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + --ctrl->execTaskNum; + ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; + + SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { + SSchLevel *pLevel = pTask->level; + int32_t code = 0; + SSchFlowControl *ctrl = NULL; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + do { + ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SSchFlowControl nctrl = {.tableNumSum = pTask->plan->execNodeStat.tableNum, .execTaskNum = 1}; + + code = taosHashPut(pLevel->flowCtrl, ep, sizeof(SEp), &nctrl, sizeof(nctrl)); + if (code) { + if (HASH_NODE_EXIST(code)) { + continue; + } + + SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); + + *enough = true; + return TSDB_CODE_SUCCESS; + } + + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (0 == ctrl->execTaskNum) { + ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum; + + if (sum <= schMgmt.cfg.maxNodeTableNum) { + ctrl->tableNumSum = sum; + ++ctrl->execTaskNum; + + *enough = true; + break; + } + + if (NULL == ctrl->taskList) { + ctrl->taskList = taosArrayInit(pLevel->taskNum, POINTER_BYTES); + if (NULL == ctrl->taskList) { + SCH_TASK_ELOG("taosArrayInit taskList failed, size:%d", (int32_t)pLevel->taskNum); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(ctrl->taskList, &pTask)) { + SCH_TASK_ELOG("taosArrayPush to taskList failed, size:%d", (int32_t)taosArrayGetSize(ctrl->taskList)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *enough = false; + ctrl->sorted = false; + + break; + } while (true); + +_return: + + SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + SCH_RET(code); +} + +int32_t schTaskTableNumCompare(const void* key1, const void* key2) { + SSchTask *pTask1 = *(SSchTask **)key1; + SSchTask *pTask2 = *(SSchTask **)key2; + + if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) { + return 1; + } else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) { + return -1; + } else { + return 0; + } +} + + +int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + return TSDB_CODE_SUCCESS; + } + + int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; + int32_t taskNum = taosArrayGetSize(ctrl->taskList); + int32_t code = 0; + SSchTask *pTask = NULL; + + if (taskNum > 1 && !ctrl->sorted) { + taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order + } + + for (int32_t i = 0; i < taskNum; ++i) { + pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { + SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + + continue; + } + + ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; + + taosArrayRemove(ctrl->taskList, i); + + SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + + remainNum -= pTask->plan->execNodeStat.tableNum; + if (remainNum <= 0) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum); + + break; + } + + if (i < (taskNum - 1)) { + SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); + if (remainNum < pLastTask->plan->execNodeStat.tableNum) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); + + break; + } + } + + --i; + --taskNum; + } + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + if (code) { + code = schProcessOnTaskFailure(pJob, pTask, code); + } + + SCH_RET(code); +} + + +int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { + if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + + SSchLevel *pLevel = pTask->level; + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pLevel->flowCtrl, ep, sizeof(SEp)); + if (NULL == ctrl) { + SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl)); + +} + + diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2b5ad4f26b..fe886dfcdb 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -91,6 +91,18 @@ void schFreeTask(SSchTask* pTask) { } +static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (pStatus) { + *pStatus = status; + } + + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING + || status == JOB_TASK_STATUS_SUCCEED); +} + + int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); @@ -102,22 +114,24 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m case TDMT_VND_FETCH_RSP: case TDMT_VND_DROP_TASK: if (lastMsgType != (msgType - 1)) { - SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: - SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + atomic_store_32(&pTask->lastMsgType, -1); + return TSDB_CODE_SUCCESS; } @@ -197,6 +211,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); + SCH_ERR_RET(code); } @@ -275,7 +290,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); - if (pJob->attr.queryJob && pLevel->taskNum > 1) { + if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) { SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -285,10 +300,9 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { - int32_t idx = atomic_load_8(&pTask->candidateIdx); - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -323,9 +337,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { - SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -379,7 +393,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n); - SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType); + SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0}; SSchTask *pTask = &task; @@ -488,6 +502,8 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + } else { + SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); } int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -564,15 +580,46 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { } -int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... - // TODO if needRetry, set task retry info - // TODO set condidateIdx - // TODO record failed but tried task *needRetry = false; return TSDB_CODE_SUCCESS; + + //TODO CHECK epList/condidateList + if (SCH_IS_DATA_SRC_TASK(pTask)) { + + } else { + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + + if ((pTask->candidateIdx + 1) >= candidateNum) { + return TSDB_CODE_SUCCESS; + } + + ++pTask->candidateIdx; + } + + +} + +int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { + atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + } + + if (SCH_IS_DATA_SRC_TASK(pTask)) { + SCH_SWITCH_EPSET(&pTask->plan->execNode); + } else { + ++pTask->candidateIdx; + } + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { @@ -588,14 +635,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod } int32_t code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code); + SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + + SCH_RET(code); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); } @@ -606,38 +653,8 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { } -// Note: no more error processing, handled in function internal -int32_t schFetchFromRemote(SSchJob *pJob) { - int32_t code = 0; - - if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); - return TSDB_CODE_SUCCESS; - } - void *res = atomic_load_ptr(&pJob->res); - if (res) { - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - - SCH_JOB_DLOG("res already fetched, res:%p", res); - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); - - return TSDB_CODE_SUCCESS; - -_return: - - atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - - schProcessOnJobFailure(pJob, code); - - return code; -} - - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; @@ -655,9 +672,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, code)); - - SCH_RET(code); + SCH_RET(schProcessOnJobFailure(pJob, code)); } int32_t schProcessOnDataFetched(SSchJob *job) { @@ -665,8 +680,16 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } + bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -674,16 +697,16 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); - SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); + SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - code = schMoveTaskToFailList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(errCode); - } + SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); + } else { + SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask)); + return TSDB_CODE_SUCCESS; } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); @@ -702,35 +725,31 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } } } else { - // Note: no more error processing, already handled - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; } _return: - SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode)); - - SCH_ERR_RET(errCode); + SCH_RET(schProcessOnJobFailure(pJob, errCode)); } - -// Note: no more error processing, handled in function internal +// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; - SSchTask *pErrTask = pTask; - code = schMoveTaskToSuccList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); + + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); - + + SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; @@ -759,14 +778,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - code = schMoveTaskToExecList(pJob, pTask, &moved); - if (code && moved) { - SCH_ERR_RET(code); - } + SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); - - return TSDB_CODE_SUCCESS; + SCH_RET(schProcessOnJobPartialSuccess(pJob)); } /* @@ -780,8 +794,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - pErrTask = par; - int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); @@ -790,7 +802,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { - SCH_ERR_RET(schLaunchTask(pJob, par)); + SCH_ERR_RET(schLaunchTaskImpl(pJob, par)); } } @@ -798,22 +810,55 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pErrTask, code)); - - SCH_ERR_RET(code); + SCH_RET(schProcessOnJobFailure(pJob, code)); } + +// Note: no more error processing, handled in function internal +int32_t schFetchFromRemote(SSchJob *pJob) { + int32_t code = 0; + + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); + return TSDB_CODE_SUCCESS; + } + + void *res = atomic_load_ptr(&pJob->res); + if (res) { + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_JOB_DLOG("res already fetched, res:%p", res); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); +} + + +// Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; + int8_t status = 0; + + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); + + SCH_RET(atomic_load_32(&pJob->errCode)); + } SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType)); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } - + SCH_ERR_JRET(rspCode); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -828,9 +873,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch pJob->resNumOfRows += rsp->affectedRows; #else - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } + SCH_ERR_JRET(rspCode); SSubmitRsp *rsp = (SSubmitRsp *)msg; if (rsp) { @@ -845,9 +888,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); @@ -856,9 +901,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -867,14 +914,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (pJob->res) { SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); tfree(rsp); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } atomic_store_ptr(&pJob->res, rsp); @@ -886,7 +934,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - SCH_ERR_JRET(schProcessOnDataFetched(pJob)); + schProcessOnDataFetched(pJob); break; } case TDMT_VND_DROP_TASK_RSP: { @@ -904,9 +952,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - - SCH_RET(code); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -1057,7 +1103,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t code = 0; bool isCandidateAddr = false; if (NULL == addr) { - addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); + addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; } @@ -1177,28 +1223,17 @@ _return: SCH_RET(code); } -static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (pStatus) { - *pStatus = status; - } - return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED - || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); -} - - -// Note: no more error processing, handled in function internal -int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { +int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; int32_t code = 0; + + atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); + SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status); - code = atomic_load_32(&pJob->errCode); - SCH_ERR_RET(code); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_RET(atomic_load_32(&pJob->errCode)); } SSubplan *plan = pTask->plan; @@ -1207,38 +1242,69 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); - SCH_ERR_JRET(code); + SCH_ERR_RET(code); } else { SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + + return TSDB_CODE_SUCCESS; +} + +// Note: no more error processing, handled in function internal +int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { + bool enough = false; + int32_t code = 0; + + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { + SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + + if (enough) { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + } else { + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + } + return TSDB_CODE_SUCCESS; _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - SCH_RET(code); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { + for (int32_t i = 0; i < level->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(level->subTasks, i); + + SCH_ERR_RET(schLaunchTask(pJob, pTask)); + } + + return TSDB_CODE_SUCCESS; +} + + + int32_t schLaunchJob(SSchJob *pJob) { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); - + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); - - for (int32_t i = 0; i < level->taskNum; ++i) { - SSchTask *pTask = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schLaunchTask(pJob, pTask)); - } - + + SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level)); + + SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); + return TSDB_CODE_SUCCESS; } @@ -1312,6 +1378,8 @@ void schFreeJobImpl(void *job) { for(int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + schFreeFlowCtrl(pLevel); + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); @@ -1423,10 +1491,11 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg = *cfg; if (schMgmt.cfg.maxJobNum == 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; } } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; + schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; } schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); @@ -1611,7 +1680,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { + if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); taosReleaseRef(schMgmt.jobRef, job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -1627,7 +1696,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_ELOG("job already succeed, status:%d", status); + SCH_JOB_DLOG("job already succeed, status:%d", status); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index a4fe88d545..0347318ae5 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -125,6 +125,9 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); mergePlan->msgType = TDMT_VND_QUERY; + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(scan->pNodeList, (SNode*)scanPlan); @@ -135,6 +138,68 @@ void schtBuildQueryDag(SQueryPlan *dag) { nodesListAppend(dag->pSubplans, (SNode*)scan); } +void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { + uint64_t qId = schtQueryId; + int32_t scanPlanNum = 20; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = nodesMakeList(); + SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + + SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan)); + SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); + + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + + mergePlan->pChildren = nodesMakeList(); + + for (int32_t i = 0; i < scanPlanNum; ++i) { + scanPlan[i].id.queryId = qId; + scanPlan[i].id.templateId = 0x0000000000000002; + scanPlan[i].id.subplanId = 0x0000000000000003 + i; + scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN; + + scanPlan[i].execNode.nodeId = 1 + i; + scanPlan[i].execNode.epset.inUse = 0; + scanPlan[i].execNodeStat.tableNum = rand() % 30; + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030); + addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030); + scanPlan[i].execNode.epset.inUse = rand() % 3; + + scanPlan[i].pChildren = NULL; + scanPlan[i].level = 1; + scanPlan[i].pParents = nodesMakeList(); + scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + scanPlan[i].msgType = TDMT_VND_QUERY; + + nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan); + nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i)); + + nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i)); + } + + mergePlan->id.queryId = qId; + mergePlan->id.templateId = schtMergeTemplateId; + mergePlan->id.subplanId = 0x5555; + mergePlan->subplanType = SUBPLAN_TYPE_MERGE; + mergePlan->level = 0; + mergePlan->execNode.epset.numOfEps = 0; + + mergePlan->pParents = NULL; + mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); + mergePlan->msgType = TDMT_VND_QUERY; + + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); + + nodesListAppend(dag->pSubplans, (SNode*)merge); + nodesListAppend(dag->pSubplans, (SNode*)scan); +} + + void schtFreeQueryDag(SQueryPlan *dag) { } @@ -182,6 +247,8 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode)); insertPlan[1].msgType = TDMT_VND_SUBMIT; + inserta->pNodeList = nodesMakeList(); + nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); insertPlan += 1; nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); @@ -547,11 +614,9 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag; - memset(&dag, 0, sizeof(dag)); - schtInitLogFile(); + memset(&dag, 0, sizeof(dag)); SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); @@ -648,6 +713,101 @@ TEST(queryTest, normalCase) { schedulerDestroy(); } +TEST(queryTest, flowCtrlCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + int64_t job = 0; + SQueryPlan dag; + + schtInitLogFile(); + + srand(time(NULL)); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + + SEp qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildQueryFlowCtrlDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + ASSERT_EQ(code, 0); + + + SSchJob *pJob = schAcquireJob(job); + + bool queryDone = false; + + while (!queryDone) { + void *pIter = taosHashIterate(pJob->execTasks, NULL); + if (NULL == pIter) { + break; + } + + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + taosHashCancelIterate(pJob->execTasks, pIter); + + if (task->lastMsgType == TDMT_VND_QUERY) { + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + } else if (task->lastMsgType == TDMT_VND_RES_READY) { + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + } else { + queryDone = true; + break; + } + + pIter = NULL; + } + } + + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); + + void *data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); + + data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_TRUE(data == NULL); + + schReleaseJob(job); + + schedulerFreeJob(job); + + schtFreeQueryDag(&dag); + + schedulerDestroy(); +} TEST(insertTest, normalCase) { @@ -659,8 +819,6 @@ TEST(insertTest, normalCase) { SQueryPlan dag; uint64_t numOfRows = 0; - schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0};