From bc7698366a5b712b93a71208dae8e3f4d8ced3c5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 4 Aug 2022 11:11:26 +0800 Subject: [PATCH] enh: launch tables in same db --- source/libs/catalog/src/ctgCache.c | 273 +++++++++++++++++++++++++++- source/libs/catalog/src/ctgRemote.c | 2 +- source/libs/qcom/src/queryUtil.c | 2 + 3 files changed, 274 insertions(+), 3 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 1b5968d426..6cec690bad 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -242,7 +242,6 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S goto _return; } - int32_t sz = 0; pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); if (NULL == pCache) { ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); @@ -282,7 +281,6 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, goto _return; } - int32_t sz = 0; char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); if (NULL == stName) { ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); @@ -2152,6 +2150,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet return TSDB_CODE_SUCCESS; } +#if 0 int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { int32_t tbNum = taosArrayGetSize(ctx->pNames); SName* fName = taosArrayGet(ctx->pNames, 0); @@ -2206,6 +2205,276 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe return TSDB_CODE_SUCCESS; } +#endif + +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { + int32_t tbNum = taosArrayGetSize(ctx->pNames); + int32_t fIdx = 0; + SName* pName = taosArrayGet(ctx->pNames, 0); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + int32_t flag = CTG_FLAG_UNKNOWN_STB; + uint64_t lastSuid = 0; + STableMeta* lastTableMeta = NULL; + + if (IS_SYS_DBNAME(pName->dbname)) { + CTG_FLAG_SET_SYS_DB(flag); + strcpy(dbFName, pName->dbname); + } else { + tNameGetFullDbName(pName, dbFName); + } + + SCtgDBCache *dbCache = NULL; + SCtgTbCache* pCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + + if (NULL == dbCache) { + ctgDebug("db %s not in cache", dbFName); + for (int32_t i = 0; i < tbNum; ++i) { + SMetaRes res = {0}; + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + } + + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < tbNum; ++i) { + SName* pName = taosArrayGet(ctx->pNames, i); + SMetaRes res = {0}; + + pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); + if (NULL == pCache) { + ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName); + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + continue; + } + + CTG_LOCK(CTG_READ, &pCache->metaLock); + if (NULL == pCache->pMeta) { + ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName); + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + continue; + } + + STableMeta* tbMeta = pCache->pMeta; + + SCtgTbMetaCtx nctx = {0}; + nctx.flag = flag; + nctx.tbInfo.inCache = true; + nctx.tbInfo.dbId = dbCache->dbId; + nctx.tbInfo.suid = tbMeta->suid; + nctx.tbInfo.tbType = tbMeta->tableType; + + STableMeta* pTableMeta = NULL; + if (tbMeta->tableType != TSDB_CHILD_TABLE) { + int32_t metaSize = CTG_META_SIZE(tbMeta); + pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == pTableMeta) { + //ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(pTableMeta, tbMeta, metaSize); + + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); + + res.pRes = pTableMeta; + taosArrayPush(ctx->pTbMetas, &res); + + continue; + } + + // PROCESS FOR CHILD TABLE + + if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) { + cloneTableMeta(lastTableMeta, &pTableMeta); + + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); + + res.pRes = pTableMeta; + taosArrayPush(ctx->pTbMetas, &res); + + continue; + } + + int32_t metaSize = sizeof(SCTableMeta); + pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(pTableMeta, tbMeta, metaSize); + + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", + pName->tname, nctx.tbInfo.tbType, dbFName); + + char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid)); + if (NULL == stName) { + ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName); + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + taosMemoryFreeClear(pTableMeta); + continue; + } + + pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName)); + if (NULL == pCache) { + ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName); + taosHashRelease(dbCache->stbCache, stName); + + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + taosMemoryFreeClear(pTableMeta); + continue; + } + + taosHashRelease(dbCache->stbCache, stName); + + CTG_LOCK(CTG_READ, &pCache->metaLock); + if (NULL == pCache->pMeta) { + ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName); + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + taosMemoryFreeClear(pTableMeta); + + continue; + } + + STableMeta* stbMeta = pCache->pMeta; + if (stbMeta->suid != nctx.tbInfo.suid) { + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid); + + if (NULL == ctx->pFetchs) { + ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.flag = flag; + + taosArrayPush(ctx->pFetchs, &fetch); + taosArrayPush(ctx->pTbMetas, &res); + + taosMemoryFreeClear(pTableMeta); + + continue; + } + + metaSize = CTG_META_SIZE(stbMeta); + pTableMeta = taosMemoryRealloc(pTableMeta, metaSize); + if (NULL == pTableMeta) { + //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); + + if (pCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + res.pRes = pTableMeta; + taosArrayPush(ctx->pTbMetas, &res); + + lastSuid = pTableMeta->suid; + lastTableMeta = pTableMeta; + } + + if (NULL == ctx->pFetchs) { + TSWAP(*pTbMetas, ctx->pTbMetas); + } + + return TSDB_CODE_SUCCESS; +} + int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { int32_t code = 0; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index d489496e37..a8d5a7ae95 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -69,7 +69,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu pTask->pBatchs = pBatchs; pTask->msgIdx = rsp.msgIdx; - ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, pTask->msgIdx, TMSG_INFO(taskMsg.msgType + 1)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index d8fda57791..4cad6a078b 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -462,3 +462,5 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { return TSDB_CODE_SUCCESS; } + +