From 1138d8abb24368f618d20dcee8ce701b91f710d6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 8 Dec 2022 10:33:26 +0800 Subject: [PATCH 01/22] enh: add table cached meta and vg new api --- include/libs/catalog/catalog.h | 2 ++ source/libs/catalog/src/catalog.c | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6154882756..a95db86d3f 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -211,6 +211,8 @@ int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STab int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); +int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta); + /** * Force refresh DB's local cached vgroup info. * @param pCtg (input, got with catalogGetHandle) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 1b9bf7f99c..7e1269f0dc 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1118,6 +1118,13 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); } +int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); +} + + #if 0 int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { CTG_API_ENTER(); From d5e3d750d1c4ce09294d31a42da1fd0107c2eb8b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 8 Dec 2022 11:14:06 +0800 Subject: [PATCH 02/22] enh: merge getmetafromcache and getvgroupfromcache interface --- source/libs/parser/src/parInsertSql.c | 67 +++++++++++++++++++-------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 59545d8fbc..13d040cb0c 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, - pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); + pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, + TSDB_DEFAULT_TABLE_TTL); } static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { @@ -829,6 +830,33 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo return code; } +static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) { + SVgroupInfo vg; + bool exists = true; + int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists); + if (TSDB_CODE_SUCCESS == code) { + if (exists) { + code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); + } + *pMissCache = !exists; + } + return code; +} + +static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) { + SParseContext* pComCxt = pCxt->pComCxt; + int32_t code = TSDB_CODE_SUCCESS; + if (pComCxt->async) { + code = getTableMetaAndVgroupImpl(pComCxt->pCatalog, pStmt, pMissCache); + } else { + code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache); + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + } + } + return code; +} + static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pCxt->forceUpdate) { pCxt->missCache = true; @@ -836,11 +864,14 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt } int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); + // if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + // code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); + // } + // if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + // code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + // } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); - } - if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); } return code; } @@ -933,11 +964,10 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* if (pStmt->usingTableProcessing) { pStmt->pTableMeta->uid = 0; } - - return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), - getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, - &pStmt->createTblReq); + + return insGetDataBlockFromList( + pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), + getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq); } char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); @@ -1540,8 +1570,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; - int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, - pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); + int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, + pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, + pStmt->usingTableName.tname); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pStmt->pVgroupsHashObj = NULL; @@ -1915,13 +1946,11 @@ static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCata } int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) { - SInsertParseContext context = { - .pComCxt = pCxt, - .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, - .missCache = false, - .usingDuplicateTable = false, - .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) - }; + SInsertParseContext context = {.pComCxt = pCxt, + .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, + .missCache = false, + .usingDuplicateTable = false, + .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { From 89a6180a7cbab787696f435c8b6166498889384a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 8 Dec 2022 13:18:11 +0800 Subject: [PATCH 03/22] enh: add retrieve cached table vgroup and meta api --- source/libs/catalog/inc/catalogInt.h | 3 + source/libs/catalog/src/catalog.c | 31 +++- source/libs/catalog/src/ctgCache.c | 214 +++++++++++++++++++-------- 3 files changed, 183 insertions(+), 65 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c435c46d46..0a43324a56 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); int32_t ctgdGetOneHandle(SCatalog **pHandle); int ctgVgInfoComp(const void* lp, const void* rp); int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); +int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb); +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName); +void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 7e1269f0dc..23c39a7e3a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -551,6 +551,35 @@ _return: CTG_RET(code); } +int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + int32_t code = 0; + char db[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, db); + SCtgDBCache *dbCache = NULL; + SCtgTbCache *tbCache = NULL; + + CTG_ERR_RET(ctgAcquireVgMetaFromCache(pCtg, db, pTableName->tname, &dbCache, &tbCache)); + + if (NULL == dbCache || NULL == tbCache) { + *pTableMeta = NULL; + return TSDB_CODE_SUCCESS; + } + + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup)); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_UNKNOWN_STB; + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, dbCache, &tbCache, pTableMeta, db)); + +_return: + + ctgReleaseVgMetaToCache(pCtg, dbCache, tbCache); + + CTG_RET(code); +} + + int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) { int32_t code = 0; @@ -1121,7 +1150,7 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); + CTG_API_LEAVE(ctgGetCachedTbVgMeta(pCtg, pTableName, pVgroup, pTableMeta)); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index f41058584f..4124202b03 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -151,6 +151,16 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache } } +void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + ctgRUnlockVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); +} + int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { SCtgDBCache *dbCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); @@ -226,6 +236,75 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { + SCtgDBCache *dbCache = NULL; + SCtgTbCache *tbCache = NULL; + + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + if (NULL == dbCache) { + ctgDebug("db %s not in cache", dbFName); + CTG_CACHE_STAT_INC(numOfVgMiss, 1); + goto _return; + } + + bool vgInCache = false; + ctgRLockVgInfo(pCtg, dbCache, &vgInCache); + if (!vgInCache) { + ctgDebug("vgInfo of db %s not in cache", dbFName); + CTG_CACHE_STAT_INC(numOfVgMiss, 1); + goto _return; + } + + *pDb = dbCache; + + CTG_CACHE_STAT_INC(numOfVgHit, 1); + + ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); + + tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); + if (NULL == tbCache) { + ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); + goto _return; + } + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + if (NULL == tbCache->pMeta) { + ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); + goto _return; + } + + *pTb = tbCache; + + ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName); + + CTG_CACHE_STAT_INC(numOfMetaHit, 1); + + return TSDB_CODE_SUCCESS; + +_return: + + if (tbCache) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + taosHashRelease(dbCache->tbCache, tbCache); + } + + if (vgInCache) { + ctgRUnlockVgInfo(dbCache); + } + + if (dbCache) { + ctgReleaseDBCache(pCtg, dbCache); + } + + *pDb = NULL; + *pTb = NULL; + + return TSDB_CODE_SUCCESS; +} + + /* int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; @@ -378,6 +457,76 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32 return TSDB_CODE_SUCCESS; } +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) { + SCtgTbCache *tbCache = *pTb; + STableMeta *tbMeta = tbCache->pMeta; + ctx->tbInfo.inCache = true; + ctx->tbInfo.dbId = dbCache->dbId; + ctx->tbInfo.suid = tbMeta->suid; + ctx->tbInfo.tbType = tbMeta->tableType; + + if (tbMeta->tableType != TSDB_CHILD_TABLE) { + int32_t metaSize = CTG_META_SIZE(tbMeta); + *pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(*pTableMeta, tbMeta, metaSize); + + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); + return TSDB_CODE_SUCCESS; + } + + // PROCESS FOR CHILD TABLE + + int32_t metaSize = sizeof(SCTableMeta); + *pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(*pTableMeta, tbMeta, metaSize); + + //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); + + if (tbCache) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + taosHashRelease(dbCache->tbCache, tbCache); + *pTb = NULL; + } + + ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, + ctx->tbInfo.tbType, dbFName); + + ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); + if (NULL == tbCache) { + taosMemoryFreeClear(*pTableMeta); + ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); + return TSDB_CODE_SUCCESS; + } + + *pTb = tbCache; + + STableMeta *stbMeta = tbCache->pMeta; + if (stbMeta->suid != ctx->tbInfo.suid) { + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); + taosMemoryFreeClear(*pTableMeta); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + metaSize = CTG_META_SIZE(stbMeta); + *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { int32_t code = 0; SCtgDBCache *dbCache = NULL; @@ -397,70 +546,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** return TSDB_CODE_SUCCESS; } - STableMeta *tbMeta = tbCache->pMeta; - ctx->tbInfo.inCache = true; - ctx->tbInfo.dbId = dbCache->dbId; - ctx->tbInfo.suid = tbMeta->suid; - ctx->tbInfo.tbType = tbMeta->tableType; - - if (tbMeta->tableType != TSDB_CHILD_TABLE) { - int32_t metaSize = CTG_META_SIZE(tbMeta); - *pTableMeta = taosMemoryCalloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(*pTableMeta, tbMeta, metaSize); - - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); - return TSDB_CODE_SUCCESS; - } - - // PROCESS FOR CHILD TABLE - - int32_t metaSize = sizeof(SCTableMeta); - *pTableMeta = taosMemoryCalloc(1, metaSize); - if (NULL == *pTableMeta) { - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(*pTableMeta, tbMeta, metaSize); - - //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - - if (tbCache) { - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - taosHashRelease(dbCache->tbCache, tbCache); - } - - ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, - ctx->tbInfo.tbType, dbFName); - - ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); - if (NULL == tbCache) { - //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - taosMemoryFreeClear(*pTableMeta); - ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); - return TSDB_CODE_SUCCESS; - } - - STableMeta *stbMeta = tbCache->pMeta; - if (stbMeta->suid != ctx->tbInfo.suid) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - metaSize = CTG_META_SIZE(stbMeta); - *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); - if (NULL == *pTableMeta) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, dbCache, &tbCache, pTableMeta, dbFName)); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); From 39d35538a3f9456d1a53a2151dc4c0d6791fe882 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 8 Dec 2022 13:53:16 +0800 Subject: [PATCH 04/22] fix: fix compile error --- source/libs/parser/src/parInsertSql.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 13d040cb0c..6f6c28e8b4 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -847,7 +847,7 @@ static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStm SParseContext* pComCxt = pCxt->pComCxt; int32_t code = TSDB_CODE_SUCCESS; if (pComCxt->async) { - code = getTableMetaAndVgroupImpl(pComCxt->pCatalog, pStmt, pMissCache); + code = getTableMetaAndVgroupImpl(pComCxt, pStmt, pMissCache); } else { code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { From 44b2cd1b7391dddde3fc9cfade17ff395d4cd237 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 8 Dec 2022 15:24:34 +0800 Subject: [PATCH 05/22] enh: merge getmetafromcache and getvgroupfromcache interface --- source/libs/parser/src/parInsertSql.c | 76 +++++++++++++++++++-------- 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 6f6c28e8b4..60c7e0a8e9 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -832,13 +832,12 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) { SVgroupInfo vg; - bool exists = true; - int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists); + int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta); if (TSDB_CODE_SUCCESS == code) { - if (exists) { + if (NULL != pStmt->pTableMeta) { code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - *pMissCache = !exists; + *pMissCache = (NULL == pStmt->pTableMeta); } return code; } @@ -857,6 +856,18 @@ static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStm return code; } +static int32_t collectUseTable(const SName* pName, SHashObj* pTable) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName)); +} + +static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pName, dbFName); + return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)); +} + static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pCxt->forceUpdate) { pCxt->missCache = true; @@ -864,15 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt } int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); - // if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - // code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); - // } - // if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - // code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); - // } +#if 0 + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); + } + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + } +#else if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); } +#endif + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj); + } + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj); + } return code; } @@ -1807,16 +1827,25 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR static int32_t setRefreshMate(SQuery* pQuery) { SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; - SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); - while (NULL != pTable) { - taosArrayPush(pQuery->pTableList, pTable); - pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); + + if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) { + taosArrayDestroy(pQuery->pTableList); + pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName)); + SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); + while (NULL != pTable) { + taosArrayPush(pQuery->pTableList, pTable); + pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); + } } - char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); - while (NULL != pDb) { - taosArrayPush(pQuery->pDbList, pDb); - pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); + if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) { + taosArrayDestroy(pQuery->pDbList); + pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN); + char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); + while (NULL != pDb) { + taosArrayPush(pQuery->pDbList, pDb); + pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); + } } return TSDB_CODE_SUCCESS; @@ -1930,16 +1959,17 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm } static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) { + SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; if (pCxt->missCache) { - parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId, - ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId, + pStmt->totalRowsNum, pStmt->totalTbNum); pQuery->execStage = QUERY_EXEC_STAGE_PARSE; - return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq); + return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq); } - parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId, - ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum, + pStmt->totalTbNum); pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE; return TSDB_CODE_SUCCESS; From 23a3a69ddd26a092b30cef30e5bc0b9bb085e64d Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 8 Dec 2022 16:19:48 +0800 Subject: [PATCH 06/22] enh: merge getmetafromcache and getvgroupfromcache interface --- source/client/src/clientMain.c | 14 ++++++++++++-- source/libs/parser/src/parInsertSql.c | 14 ++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f2dec7217f..5a0ab533d8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -21,12 +21,12 @@ #include "os.h" #include "query.h" #include "scheduler.h" +#include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tdatablock.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -796,7 +796,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, + tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { pWrapper->pCatalogReq->forceUpdate = false; @@ -930,6 +931,15 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + + if (NEED_CLIENT_HANDLE_ERROR(code)) { + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, + pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); + pRequest->prevCode = code; + doAsyncQuery(pRequest, true); + return; + } + terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 60c7e0a8e9..36420599b3 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -887,11 +887,11 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); } #endif - if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj); - } - if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj); + } } return code; } @@ -913,6 +913,12 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); } + if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { + code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj); + } + } return code; } From 328b3e03e9c10cd352e5992eb5b7448b2eda0c9b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 8 Dec 2022 17:15:10 +0800 Subject: [PATCH 07/22] fix: force update retry issue --- source/client/src/clientMain.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 5a0ab533d8..7fdb735a07 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -800,7 +800,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { - pWrapper->pCatalogReq->forceUpdate = false; + //pWrapper->pCatalogReq->forceUpdate = false; code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } @@ -881,6 +881,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { code = pRequest->prevCode; + terrno = code; + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + return; } if (TSDB_CODE_SUCCESS == code) { From a202b298534f61f4b29d2a9c93db17d9cb280a00 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 8 Dec 2022 17:37:35 +0800 Subject: [PATCH 08/22] fix: compile warn --- source/os/src/osDir.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 9175067568..331d241745 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -394,8 +394,8 @@ char *taosDirEntryBaseName(char *name) { char *pPoint = strrchr(name, '/'); if (pPoint != NULL) { if (*(pPoint + 1) == '\0') { - *pPoint = '\0'; - return taosDirEntryBaseName(name); + *pPoint = '\0'; + return taosDirEntryBaseName(name); } return pPoint + 1; } @@ -500,8 +500,9 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { void taosGetCwd(char *buf, int32_t len) { #if !defined(WINDOWS) - (void)getcwd(buf, len - 1); + char *unused __attribute__((unused)); + unused = getcwd(buf, len - 1); #else - strncpy(buf, "not implemented on windows", len -1); + strncpy(buf, "not implemented on windows", len - 1); #endif } From 617cd40541e07ad0305b385a6374562fdff6c7e7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 8 Dec 2022 22:22:04 +0800 Subject: [PATCH 09/22] fix: fix ut case issue and crash issue --- source/libs/catalog/src/ctgCache.c | 2 +- source/libs/parser/test/mockCatalog.cpp | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 4124202b03..e8365a7633 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -239,6 +239,7 @@ _return: int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; SCtgTbCache *tbCache = NULL; + bool vgInCache = false; ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { @@ -247,7 +248,6 @@ int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha goto _return; } - bool vgInCache = false; ctgRLockVgInfo(pCtg, dbCache, &vgInCache); if (!vgInCache) { ctgDebug("vgInfo of db %s not in cache", dbFName); diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index cc51beb842..ae702ec02f 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -248,6 +248,13 @@ int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableNam return code; } +int32_t __catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + int32_t code = g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true); + if (code) return code; + code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true); + return code; +} + int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pVgList) { return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); @@ -316,6 +323,7 @@ void initMetaDataEnv() { stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta); stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup); stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup); + stub.set(catalogGetCachedTableVgMeta, __catalogGetCachedTableVgMeta); stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo); stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion); stub.set(catalogGetDBVgList, __catalogGetDBVgList); From 43a349bc1ee48cf92c05227b3ef8a1d1706a5715 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Dec 2022 07:12:57 +0800 Subject: [PATCH 10/22] fix: fix crash caused by null pointer --- source/libs/parser/src/parInsertSql.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 36420599b3..ff0621c09e 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1782,6 +1782,10 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata SVnodeModifOpStmt* pStmt) { clearCatalogReq(pCatalogReq); + if (NULL == pMetaData) { + return TSDB_CODE_SUCCESS; + } + if (pStmt->usingTableProcessing) { return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); } From 61ad2588cc4c514f7d3bd144a7d1120c8fb0b3d5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Dec 2022 08:56:01 +0800 Subject: [PATCH 11/22] fix: db release issue --- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 2 +- source/libs/catalog/src/ctgCache.c | 16 ++++++++++------ 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 0a43324a56..6e072a9630 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -799,7 +799,7 @@ int32_t ctgdGetOneHandle(SCatalog **pHandle); int ctgVgInfoComp(const void* lp, const void* rp); int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb); -int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName); +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName); void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 23c39a7e3a..e9e0bae8d7 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -570,7 +570,7 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf SCtgTbMetaCtx ctx = {0}; ctx.pName = (SName*)pTableName; ctx.flag = CTG_FLAG_UNKNOWN_STB; - CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, dbCache, &tbCache, pTableMeta, db)); + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db)); _return: diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index e8365a7633..47cb0ef559 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -130,7 +130,7 @@ void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) { } void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { - if (pCache) { + if (pCache && dbCache) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); } @@ -152,13 +152,15 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache } void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { - if (pCache) { + if (pCache && dbCache) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); } - ctgRUnlockVgInfo(dbCache); - ctgReleaseDBCache(pCtg, dbCache); + if (dbCache) { + ctgRUnlockVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } } int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { @@ -457,7 +459,8 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32 return TSDB_CODE_SUCCESS; } -int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) { +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) { + SCtgDBCache *dbCache = *pDb; SCtgTbCache *tbCache = *pTb; STableMeta *tbMeta = tbCache->pMeta; ctx->tbInfo.inCache = true; @@ -502,6 +505,7 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); if (NULL == tbCache) { taosMemoryFreeClear(*pTableMeta); + *pDb = NULL; ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); return TSDB_CODE_SUCCESS; } @@ -546,7 +550,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** return TSDB_CODE_SUCCESS; } - CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, dbCache, &tbCache, pTableMeta, dbFName)); + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName)); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); From 13194bb1fe766e4d4466cf0a033b2d004cb20dff Mon Sep 17 00:00:00 2001 From: sunpeng Date: Fri, 9 Dec 2022 09:27:09 +0800 Subject: [PATCH 12/22] build: upgrade taosadapter version --- cmake/taosadapter_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index d702b9967e..75679a8ff5 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 0dfad5b + GIT_TAG 566540d SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From ecbc09663f565e649689f3ea5951afa0e7a9f335 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 09:35:53 +0800 Subject: [PATCH 13/22] enh: tolerate exec error --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6a83a9a4da..d20c2902f5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); + continue; } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { From 1e51c1b49cc7bce6025f09460f4a6630eae9cf6d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 09:41:06 +0800 Subject: [PATCH 14/22] add debug log --- source/client/src/clientMain.c | 7 +++++-- source/client/src/clientTmq.c | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f2dec7217f..8f029e6061 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -21,12 +21,12 @@ #include "os.h" #include "query.h" #include "scheduler.h" +#include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tdatablock.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { return; } + tscDebug("taos free res %p", res); + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); @@ -796,7 +798,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, + tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { pWrapper->pCatalogReq->forceUpdate = false; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 766a94caf1..db717a4e4e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); + taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } + tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; From 2e964f229ea13581109fdd641f9837ffc6cdacc3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Dec 2022 09:47:17 +0800 Subject: [PATCH 15/22] fix: fix error code passing issue --- source/client/src/clientMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7fdb735a07..29887115b9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -882,6 +882,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { code = pRequest->prevCode; terrno = code; + pRequest->code = code; + tscDebug("call sync query cb with code: %s", tstrerror(code)); pRequest->body.queryFp(pRequest->body.param, pRequest, code); return; } From bb0c337e43e03fa75facba948bc48e13102d2ab0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Dec 2022 10:06:07 +0800 Subject: [PATCH 16/22] fix: client retry issue --- source/client/src/clientMain.c | 2 ++ source/libs/parser/src/parInsertSql.c | 4 ---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 29887115b9..329d715aaf 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -936,6 +936,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + qDestroyQuery(pRequest->pQuery); + pRequest->pQuery = NULL; if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index ff0621c09e..36420599b3 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1782,10 +1782,6 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata SVnodeModifOpStmt* pStmt) { clearCatalogReq(pCatalogReq); - if (NULL == pMetaData) { - return TSDB_CODE_SUCCESS; - } - if (pStmt->usingTableProcessing) { return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); } From c22fa53bc4fe6c0e6269eb153c48ddccf4891656 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 9 Dec 2022 10:10:31 +0800 Subject: [PATCH 17/22] fix: release lock and memory in exceptional cases --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 ++ source/libs/tdb/src/db/tdbPager.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 37fae4f709..5b09ce5eb6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -953,6 +953,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs code = tsdbReadDelIdx(pDelFReader, pDelIdxArray); if (code) { + taosArrayDestroy(pDelIdxArray); tsdbDelFReaderClose(&pDelFReader); goto _err; } @@ -961,6 +962,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline); if (code) { + taosArrayDestroy(pDelIdxArray); tsdbDelFReaderClose(&pDelFReader); goto _err; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index e3d886b046..648e99d6a5 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -720,14 +720,16 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage pgno = TDB_PAGE_PGNO(pPage); - tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); + tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); if (loadPage && pgno <= pPager->dbOrigSize) { init = 1; nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1)); - tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead); + tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead); if (nRead < pPage->pageSize) { ASSERT(0); + tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize); + TDB_UNLOCK_PAGE(pPage); return -1; } } else { From 75f9728ab96f6145e23f5bc3360bee9046bac4a2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 10:43:12 +0800 Subject: [PATCH 18/22] fix: supportVnodes config is not in effect --- source/dnode/mnode/impl/src/mndVgroup.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a6e52caf1f..4a6f0d14da 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup for (int32_t v = 0; v < pVgroup->replica; ++v) { SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pDnode = taosArrayGet(pArray, v); - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } @@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro } if (used) continue; - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } From 06b0fefc385c94c0fba6ac578c796ede1e402747 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 11:15:22 +0800 Subject: [PATCH 19/22] fix: check topic privilege only --- source/dnode/mnode/impl/src/mndConsumer.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 373a653b51..8e2b17a963 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -554,10 +554,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto SUBSCRIBE_OVER; } + // check topic only +#if 0 if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { mndReleaseTopic(pMnode, pTopic); goto SUBSCRIBE_OVER; } +#endif if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { mndReleaseTopic(pMnode, pTopic); From 59c48d03962168aa58c12e0e68e7c16d67c2c265 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 11:29:42 +0800 Subject: [PATCH 20/22] test: addjust cases --- tests/script/tsim/dnode/balance2.sim | 10 +++++----- tests/script/tsim/vnode/stable_balance_replica1.sim | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/script/tsim/dnode/balance2.sim b/tests/script/tsim/dnode/balance2.sim index f677d3925c..3f5a42d4d3 100644 --- a/tests/script/tsim/dnode/balance2.sim +++ b/tests/script/tsim/dnode/balance2.sim @@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode5 -i 5 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode5 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode5 -c supportVnodes -v 5 print ========== step1 system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/tsim/vnode/stable_balance_replica1.sim b/tests/script/tsim/vnode/stable_balance_replica1.sim index f124979397..84190a3be0 100644 --- a/tests/script/tsim/vnode/stable_balance_replica1.sim +++ b/tests/script/tsim/vnode/stable_balance_replica1.sim @@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 $dbPrefix = br1_db $tbPrefix = br1_tb From 997241438245554f923902315a3ef1b62a575312 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 9 Dec 2022 12:25:24 +0800 Subject: [PATCH 21/22] test: temporarily disable go.sh test (#18845) --- tests/parallel_test/cases.task | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 28ba7ea60f..154eebb4de 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1037,4 +1037,4 @@ ,,n,docs-examples-test,bash node.sh ,,n,docs-examples-test,bash csharp.sh ,,n,docs-examples-test,bash jdbc.sh -,,n,docs-examples-test,bash go.sh +#,,n,docs-examples-test,bash go.sh From dc0fc02da81825ef45c0029cdbdded375fd4a3ea Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 9 Dec 2022 12:26:02 +0800 Subject: [PATCH 22/22] Revert "fix(build):centos7 x64 makeinstall cannt write taod service to systemctl" (#18836) --- packaging/tools/make_install.sh | 38 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 6a82322a12..9034fd85f5 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -497,27 +497,27 @@ function install_service_on_systemd() { taosd_service_config="${service_config_dir}/${serverName}.service" - ${csudo}bash -c "echo [Unit] >> ${taosd_service_config}" - ${csudo}bash -c "echo Description=${productName} server service >> ${taosd_service_config}" - ${csudo}bash -c "echo After=network-online.target >> ${taosd_service_config}" - ${csudo}bash -c "echo Wants=network-online.target >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Unit]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Description=${productName} server service' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'After=network-online.target' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Wants=network-online.target' >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}" - ${csudo}bash -c "echo [Service] >> ${taosd_service_config}" - ${csudo}bash -c "echo Type=simple >> ${taosd_service_config}" - ${csudo}bash -c "echo ExecStart=/usr/bin/${serverName} >> ${taosd_service_config}" - ${csudo}bash -c "echo ExecStartPre=${installDir}/bin/startPre.sh >> ${taosd_service_config}" - ${csudo}bash -c "echo TimeoutStopSec=1000000s >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitNOFILE=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitNPROC=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitCORE=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo TimeoutStartSec=0 >> ${taosd_service_config}" - ${csudo}bash -c "echo StandardOutput=null >> ${taosd_service_config}" - ${csudo}bash -c "echo Restart=always >> ${taosd_service_config}" - ${csudo}bash -c "echo StartLimitBurst=3 >> ${taosd_service_config}" - ${csudo}bash -c "echo StartLimitInterval=60s >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Service]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Type=simple' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'ExecStart=/usr/bin/${serverName}' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'ExecStartPre=${installDir}/bin/startPre.sh' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'TimeoutStartSec=0' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StandardOutput=null' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Restart=always' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StartLimitBurst=3' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StartLimitInterval=60s' >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}" - ${csudo}bash -c "echo [Install] >> ${taosd_service_config}" - ${csudo}bash -c "echo WantedBy=multi-user.target >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Install]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'WantedBy=multi-user.target' >> ${taosd_service_config}" ${csudo}systemctl enable ${serverName} }