diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index e39236ea76..3916898829 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -50,7 +50,6 @@ typedef struct SCatalogCfg { uint32_t maxDBCacheNum; } SCatalogCfg; - int32_t catalogInit(SCatalogCfg *cfg); /** @@ -88,28 +87,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); -/** - * Get a super table's meta data. - * @param pCatalog (input, got with catalogGetHandle) - * @param pTransporter (input, rpc object) - * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) - * @param pTableMeta(output, table meta data, NEED to free it by calller) - * @return error code - */ -int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); - - /** * Force renew a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); /** * Force renew a table's local cached meta data and get the new one. @@ -118,10 +104,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @param pTableMeta(output, table meta data, NEED to free it by calller) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); /** diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c4f2b54cf8..f426139c14 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -64,14 +64,6 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); -#define CTG_IS_STABLE(isSTable) (1 == (isSTable)) -#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable)) -#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0) -#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0) -#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE)) - -#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID) - #define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 9eee6f17de..3b26f8b9ef 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -105,8 +105,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN } *exist = 1; - - tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { return TSDB_CODE_SUCCESS; @@ -145,29 +143,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) { - if (NULL == pCatalog->tableCache.cache) { - return TSDB_CODE_SUCCESS; - } - - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); - - size_t sz = 0; - STableMeta *pTableMeta = NULL; - - taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz); - - if (NULL == pTableMeta) { - return TSDB_CODE_SUCCESS; - } - - *tbType = pTableMeta->tableType; - - return TSDB_CODE_SUCCESS; -} - - void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { epSet->inUse = 0; epSet->numOfEps = vgroupInfo->numOfEps; @@ -178,7 +153,14 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { } } -int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + char tbFullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFullName); + SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; @@ -197,12 +179,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { - if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - output->metaNum = 0; - ctgDebug("tbmeta:%s not exist in mnode", tbFullName); - return TSDB_CODE_SUCCESS; - } - ctgError("error rsp for table meta, code:%x", rpcRsp.code); CTG_ERR_RET(rpcRsp.code); } @@ -212,13 +188,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); - - return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output); -} - int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { @@ -228,7 +197,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE char dbFullName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFullName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -248,12 +217,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { - if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - output->metaNum = 0; - ctgDebug("tbmeta:%s not exist in vnode", pTableName->tname); - return TSDB_CODE_SUCCESS; - } - ctgError("error rsp for table meta, code:%x", rpcRsp.code); CTG_ERR_RET(rpcRsp.code); } @@ -348,6 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName if (NULL == vgInfo) { ctgError("no hash range found for hashvalue[%u]", hashValue); + + void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter1) { + vgInfo = pIter1; + ctgError("valid range:[%d, %d], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId); + pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1); + } + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -358,28 +329,22 @@ _return: CTG_RET(TSDB_CODE_SUCCESS); } -int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { +int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; - if (!forceUpdate) { + if (!forceUpdate) { CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); - if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) { + if (exist) { return TSDB_CODE_SUCCESS; } - } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) { - int32_t tbType = 0; - - CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType)); - - CTG_SET_STABLE(isSTable, tbType); } - CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable)); + CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); @@ -406,27 +371,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out } if (NULL == pCatalog->tableCache.cache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { + pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } } if (NULL == pCatalog->tableCache.stableCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); - if (NULL == cache) { + pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) { - taosHashCleanup(cache); - } } if (output->metaNum == 2) { @@ -531,50 +488,6 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD return TSDB_CODE_SUCCESS; } -int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { - if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - - SVgroupInfo vgroupInfo = {0}; - int32_t code = 0; - - CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); - - STableMetaOutput voutput = {0}; - STableMetaOutput moutput = {0}; - STableMetaOutput *output = &voutput; - - if (CTG_IS_STABLE(isSTable)) { - CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); - - if (0 == moutput.metaNum) { - CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); - } else { - output = &moutput; - } - } else { - CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); - - if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { - CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); - - tfree(voutput.tbMeta); - voutput.tbMeta = moutput.tbMeta; - moutput.tbMeta = NULL; - } - } - - CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output)); - -_return: - - tfree(voutput.tbMeta); - tfree(moutput.tbMeta); - - CTG_RET(code); -} - int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.pCluster) { @@ -737,15 +650,11 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } if (NULL == pCatalog->dbCache.cache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { + pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->dbCache.cache) { ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } } else { CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); } @@ -770,23 +679,34 @@ _return: } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta); } -int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); -} - -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable); + SVgroupInfo vgroupInfo = {0}; + int32_t code = 0; + + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); + + STableMetaOutput output = {0}; + + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output)); + + //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); + + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + +_return: + tfree(output.tbMeta); + CTG_RET(code); } -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { @@ -861,7 +781,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); _return: - if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 4fc53e5f18..1d8a48dfcb 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -45,7 +45,7 @@ void ctgTestSetPrepareSTableMeta(); bool ctgTestStop = false; bool ctgTestEnableSleep = false; -bool ctgTestDeadLoop = false; +bool ctgTestDeadLoop = true; int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; @@ -600,6 +600,7 @@ void *ctgTestSetCtableMetaThread(void *param) { } +#if 0 TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; @@ -767,7 +768,7 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); tableMeta = NULL; - code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); @@ -998,6 +999,8 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +#endif + TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 6e1256d857..378af4c1d1 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -115,7 +115,7 @@ typedef struct TFileCacheKey { int32_t nColName; } ICacheKey; -int indexFlushCacheTFile(SIndex* sIdx, void*); +int indexFlushCacheToTFile(SIndex* sIdx, void*); int32_t indexSerialCacheKey(ICacheKey* key, char* buf); diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 544849ff2f..9db913debf 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -21,9 +21,8 @@ #include "tskiplist.h" // ----------------- key structure in skiplist --------------------- -/* A data row, the format is like below: - * content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| +/* A data row, the format is like below + * content: |<---colVal---->|<-- version--->|<-- uid--->|<-- colType --->|<--operaType--->| */ #ifdef __cplusplus diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 0bf30d6efa..d0b8fa4290 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -384,7 +384,6 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { } } else { taosArrayPush(result, &tv); - // indexError("merge colVal: %s", tv->colVal); } } static void indexDestroyTempResult(SArray* result) { @@ -395,7 +394,7 @@ static void indexDestroyTempResult(SArray* result) { } taosArrayDestroy(result); } -int indexFlushCacheTFile(SIndex* sIdx, void* cache) { +int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index d43c9f9cce..8bc3776ed9 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -27,9 +27,9 @@ static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); -static void cacheTermDestroy(CacheTerm* ct); -static char* getIndexKey(const void* pData); -static int32_t compareKey(const void* l, const void* r); +static void indexCacheTermDestroy(CacheTerm* ct); +static int32_t indexCacheTermCompare(const void* l, const void* r); +static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); @@ -45,9 +45,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in return NULL; }; cache->mem = indexInternalCacheCreate(type); - - cache->colName = calloc(1, strlen(colName) + 1); - memcpy(cache->colName, colName, strlen(colName)); + cache->colName = tstrdup(colName); cache->type = type; cache->index = idx; cache->version = 0; @@ -187,8 +185,8 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { break; } else if (cache->imm != NULL) { // TODO: wake up by condition variable - // pthread_mutex_unlock(&cache->mtx); pthread_cond_wait(&cache->finished, &cache->mtx); + // pthread_mutex_unlock(&cache->mtx); // taosMsleep(50); // pthread_mutex_lock(&cache->mtx); } else { @@ -243,7 +241,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { if (mem == NULL) { return 0; } - char* key = getIndexKey(ct); + char* key = indexCacheTermGet(ct); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { @@ -321,17 +319,16 @@ void indexMemUnRef(MemTable* tbl) { } } -static void cacheTermDestroy(CacheTerm* ct) { +static void indexCacheTermDestroy(CacheTerm* ct) { if (ct == NULL) { return; } free(ct->colVal); free(ct); } -static char* getIndexKey(const void* pData) { +static char* indexCacheTermGet(const void* pData) { CacheTerm* p = (CacheTerm*)pData; return (char*)p; } - -static int32_t compareKey(const void* l, const void* r) { +static int32_t indexCacheTermCompare(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; @@ -345,7 +342,8 @@ static MemTable* indexInternalCacheCreate(int8_t type) { MemTable* tbl = calloc(1, sizeof(MemTable)); indexMemRef(tbl); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY, + indexCacheTermGet); } return tbl; } @@ -353,7 +351,7 @@ static MemTable* indexInternalCacheCreate(int8_t type) { static void doMergeWork(SSchedMsg* msg) { IndexCache* pCache = msg->ahandle; SIndex* sidx = (SIndex*)pCache->index; - indexFlushCacheTFile(sidx, pCache); + indexFlushCacheToTFile(sidx, pCache); } static bool indexCacheIteratorNext(Iterate* itera) { SSkipListIterator* iter = itera->iter; @@ -375,4 +373,7 @@ static bool indexCacheIteratorNext(Iterate* itera) { return next; } -static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; } +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { + // opt later + return &iter->val; +} diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 2b64d65e46..0763aae857 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -18,8 +18,6 @@ #include "tutil.h" static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { - // if (ctx->offset + len > ctx->limit) { return -1; } - if (ctx->type == TFile) { assert(len == tfWrite(ctx->file.fd, buf, len)); } else { @@ -125,6 +123,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { if (ctx->type == TMemory) { free(ctx->mem.buf); } else { + ctx->flush(ctx); tfClose(ctx->file.fd); if (ctx->file.readOnly) { #ifdef USE_MMAP diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 3d978c05a5..70671a5f3e 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -48,7 +48,7 @@ class FstWriter { class FstReadMemory { public: - FstReadMemory(size_t size, const std::string& fileName = fileName) { + FstReadMemory(size_t size, const std::string& fileName = "/tmp/tindex.tindex") { tfInit(); _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _w = fstCountingWriterCreate(_wc); @@ -307,7 +307,7 @@ void validateTFile(char* arg) { tfCleanup(); } int main(int argc, char* argv[]) { - // tool to check all kind of fst test + // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } // checkFstCheckIterator(); // checkFstLongTerm(); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index d48041fbdb..e944fb01bd 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tdDestroyKVRowBuilder(pKvRowBuilder); return buildInvalidOperationMsg(pMsgBuf, msg1); } } @@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa const char* msg3 = "tag value too long"; const char* msg4 = "illegal value or data overflow"; + int32_t code = 0; + STableMeta* pSuperTableMeta = NULL; + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // super table name, create table by using dst @@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SToken* pSTableNameToken = &pCreateTableInfo->stbName; - int32_t code = parserValidateNameToken(pSTableNameToken); + code = parserValidateNameToken(pSTableNameToken); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SName name = {0}; code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } SKVRowBuilder kvRowBuilder = {0}; if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } SArray* pValList = pCreateTableInfo->pTagVals; size_t numOfInputTag = taosArrayGetSize(pValList); - STableMeta* pSuperTableMeta = NULL; code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } assert(pSuperTableMeta != NULL); @@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } bool findColumnIndex = false; @@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pItem->pVar.nLen > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { @@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true); // check again after the convert since it may be converted from binary to nchar. - if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pSchema->type)) { int16_t len = varDataTLen(tagVal); if (len > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); @@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa } else { if (schemaSize != numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return code; + goto _error; } } SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - tfree(pSuperTableMeta); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } tdSortKVRowByColIdx(row); @@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SName tableName = {0}; code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tfree(pSuperTableMeta); - return code; + goto _error; } // Find a appropriate vgroup to accommodate this table , according to the table name SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + code = catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info); tfree(pSuperTableMeta); } *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap); + if (*pBufArray == NULL) { + code = terrno; + goto _error; + } taosHashCleanup(pVgroupHashmap); return TSDB_CODE_SUCCESS; + + _error: + taosHashCleanup(pVgroupHashmap); + tfree(pSuperTableMeta); + terrno = code; + return code; } static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 7828eb0f5d..a7ec39bfde 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -67,8 +67,8 @@ typedef struct SSchTask { int32_t msgLen; // msg length int8_t status; // task status SQueryNodeAddr execAddr; // task actual executed node address - int8_t condidateIdx; // current try condidation index - SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr + int8_t candidateIdx; // current try condidation index + SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* @@ -115,9 +115,8 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) -#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 832084154c..38a823de2c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { } static void cleanupTask(SSchTask* pTask) { - taosArrayDestroy(pTask->condidateAddrs); + taosArrayDestroy(pTask->candidateAddrs); } int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { @@ -226,20 +226,20 @@ _return: SCH_RET(code); } -int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { - if (task->condidateAddrs) { +int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { + if (task->candidateAddrs) { return TSDB_CODE_SUCCESS; } - task->condidateIdx = 0; - task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == task->condidateAddrs) { + task->candidateIdx = 0; + task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == task->candidateAddrs) { qError("taosArrayInit failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (task->plan->execNode.numOfEps > 0) { - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -280,8 +280,6 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "execTasks"); - return TSDB_CODE_SUCCESS; } @@ -296,8 +294,6 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "succTasks"); - *moved = true; return TSDB_CODE_SUCCESS; @@ -313,8 +309,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "failTasks"); - *moved = true; return TSDB_CODE_SUCCESS; @@ -389,7 +383,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -463,11 +457,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ELOG("task failed[%x], no more retry", errCode); + SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -506,6 +500,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms goto _task_error; } } + break; } case TDMT_VND_SUBMIT_RSP: { @@ -582,25 +577,19 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); - if (NULL == pjob || NULL == (*pjob)) { + SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == job || NULL == (*job)) { qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchJob *job = *pjob; - - SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == ptask || NULL == (*ptask)) { + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == task || NULL == (*task)) { qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - - SSchTask *task = *ptask; - - SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode); - schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode); + schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); _return: tfree(param); @@ -809,7 +798,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); + SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx); schConvertAddrToEpSet(addr, &epSet); @@ -827,10 +816,10 @@ _return: int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); + SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); - if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId); + if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { + SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }