diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index d702b9967e..75679a8ff5 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 0dfad5b + GIT_TAG 566540d SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6154882756..a95db86d3f 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -211,6 +211,8 @@ int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STab int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); +int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta); + /** * Force refresh DB's local cached vgroup info. * @param pCtg (input, got with catalogGetHandle) diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 6a82322a12..9034fd85f5 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -497,27 +497,27 @@ function install_service_on_systemd() { taosd_service_config="${service_config_dir}/${serverName}.service" - ${csudo}bash -c "echo [Unit] >> ${taosd_service_config}" - ${csudo}bash -c "echo Description=${productName} server service >> ${taosd_service_config}" - ${csudo}bash -c "echo After=network-online.target >> ${taosd_service_config}" - ${csudo}bash -c "echo Wants=network-online.target >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Unit]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Description=${productName} server service' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'After=network-online.target' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Wants=network-online.target' >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}" - ${csudo}bash -c "echo [Service] >> ${taosd_service_config}" - ${csudo}bash -c "echo Type=simple >> ${taosd_service_config}" - ${csudo}bash -c "echo ExecStart=/usr/bin/${serverName} >> ${taosd_service_config}" - ${csudo}bash -c "echo ExecStartPre=${installDir}/bin/startPre.sh >> ${taosd_service_config}" - ${csudo}bash -c "echo TimeoutStopSec=1000000s >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitNOFILE=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitNPROC=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo LimitCORE=infinity >> ${taosd_service_config}" - ${csudo}bash -c "echo TimeoutStartSec=0 >> ${taosd_service_config}" - ${csudo}bash -c "echo StandardOutput=null >> ${taosd_service_config}" - ${csudo}bash -c "echo Restart=always >> ${taosd_service_config}" - ${csudo}bash -c "echo StartLimitBurst=3 >> ${taosd_service_config}" - ${csudo}bash -c "echo StartLimitInterval=60s >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Service]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Type=simple' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'ExecStart=/usr/bin/${serverName}' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'ExecStartPre=${installDir}/bin/startPre.sh' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'TimeoutStartSec=0' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StandardOutput=null' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'Restart=always' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StartLimitBurst=3' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'StartLimitInterval=60s' >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}" - ${csudo}bash -c "echo [Install] >> ${taosd_service_config}" - ${csudo}bash -c "echo WantedBy=multi-user.target >> ${taosd_service_config}" + ${csudo}bash -c "echo '[Install]' >> ${taosd_service_config}" + ${csudo}bash -c "echo 'WantedBy=multi-user.target' >> ${taosd_service_config}" ${csudo}systemctl enable ${serverName} } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f2dec7217f..581cd636e8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -21,12 +21,12 @@ #include "os.h" #include "query.h" #include "scheduler.h" +#include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tdatablock.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { return; } + tscDebug("taos free res %p", res); + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); @@ -796,10 +798,11 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, + tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { - pWrapper->pCatalogReq->forceUpdate = false; + //pWrapper->pCatalogReq->forceUpdate = false; code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } @@ -880,6 +883,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { code = pRequest->prevCode; + terrno = code; + pRequest->code = code; + tscDebug("call sync query cb with code: %s", tstrerror(code)); + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + return; } if (TSDB_CODE_SUCCESS == code) { @@ -930,6 +938,17 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + qDestroyQuery(pRequest->pQuery); + pRequest->pQuery = NULL; + + if (NEED_CLIENT_HANDLE_ERROR(code)) { + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, + pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); + pRequest->prevCode = code; + doAsyncQuery(pRequest, true); + return; + } + terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 766a94caf1..db717a4e4e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); + taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } + tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 373a653b51..8e2b17a963 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -554,10 +554,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto SUBSCRIBE_OVER; } + // check topic only +#if 0 if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { mndReleaseTopic(pMnode, pTopic); goto SUBSCRIBE_OVER; } +#endif if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a6e52caf1f..4a6f0d14da 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup for (int32_t v = 0; v < pVgroup->replica; ++v) { SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pDnode = taosArrayGet(pArray, v); - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } @@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro } if (used) continue; - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 37fae4f709..5b09ce5eb6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -953,6 +953,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs code = tsdbReadDelIdx(pDelFReader, pDelIdxArray); if (code) { + taosArrayDestroy(pDelIdxArray); tsdbDelFReaderClose(&pDelFReader); goto _err; } @@ -961,6 +962,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline); if (code) { + taosArrayDestroy(pDelIdxArray); tsdbDelFReaderClose(&pDelFReader); goto _err; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c435c46d46..6e072a9630 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); int32_t ctgdGetOneHandle(SCatalog **pHandle); int ctgVgInfoComp(const void* lp, const void* rp); int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); +int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb); +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName); +void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 1b9bf7f99c..e9e0bae8d7 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -551,6 +551,35 @@ _return: CTG_RET(code); } +int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + int32_t code = 0; + char db[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, db); + SCtgDBCache *dbCache = NULL; + SCtgTbCache *tbCache = NULL; + + CTG_ERR_RET(ctgAcquireVgMetaFromCache(pCtg, db, pTableName->tname, &dbCache, &tbCache)); + + if (NULL == dbCache || NULL == tbCache) { + *pTableMeta = NULL; + return TSDB_CODE_SUCCESS; + } + + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup)); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_UNKNOWN_STB; + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db)); + +_return: + + ctgReleaseVgMetaToCache(pCtg, dbCache, tbCache); + + CTG_RET(code); +} + + int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) { int32_t code = 0; @@ -1118,6 +1147,13 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); } +int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetCachedTbVgMeta(pCtg, pTableName, pVgroup, pTableMeta)); +} + + #if 0 int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index f41058584f..47cb0ef559 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -130,7 +130,7 @@ void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) { } void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { - if (pCache) { + if (pCache && dbCache) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); } @@ -151,6 +151,18 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache } } +void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { + if (pCache && dbCache) { + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); + } + + if (dbCache) { + ctgRUnlockVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } +} + int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { SCtgDBCache *dbCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); @@ -226,6 +238,75 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { + SCtgDBCache *dbCache = NULL; + SCtgTbCache *tbCache = NULL; + bool vgInCache = false; + + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + if (NULL == dbCache) { + ctgDebug("db %s not in cache", dbFName); + CTG_CACHE_STAT_INC(numOfVgMiss, 1); + goto _return; + } + + ctgRLockVgInfo(pCtg, dbCache, &vgInCache); + if (!vgInCache) { + ctgDebug("vgInfo of db %s not in cache", dbFName); + CTG_CACHE_STAT_INC(numOfVgMiss, 1); + goto _return; + } + + *pDb = dbCache; + + CTG_CACHE_STAT_INC(numOfVgHit, 1); + + ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); + + tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); + if (NULL == tbCache) { + ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); + goto _return; + } + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + if (NULL == tbCache->pMeta) { + ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName); + CTG_CACHE_STAT_INC(numOfMetaMiss, 1); + goto _return; + } + + *pTb = tbCache; + + ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName); + + CTG_CACHE_STAT_INC(numOfMetaHit, 1); + + return TSDB_CODE_SUCCESS; + +_return: + + if (tbCache) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + taosHashRelease(dbCache->tbCache, tbCache); + } + + if (vgInCache) { + ctgRUnlockVgInfo(dbCache); + } + + if (dbCache) { + ctgReleaseDBCache(pCtg, dbCache); + } + + *pDb = NULL; + *pTb = NULL; + + return TSDB_CODE_SUCCESS; +} + + /* int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; @@ -378,6 +459,78 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32 return TSDB_CODE_SUCCESS; } +int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) { + SCtgDBCache *dbCache = *pDb; + SCtgTbCache *tbCache = *pTb; + STableMeta *tbMeta = tbCache->pMeta; + ctx->tbInfo.inCache = true; + ctx->tbInfo.dbId = dbCache->dbId; + ctx->tbInfo.suid = tbMeta->suid; + ctx->tbInfo.tbType = tbMeta->tableType; + + if (tbMeta->tableType != TSDB_CHILD_TABLE) { + int32_t metaSize = CTG_META_SIZE(tbMeta); + *pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(*pTableMeta, tbMeta, metaSize); + + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); + return TSDB_CODE_SUCCESS; + } + + // PROCESS FOR CHILD TABLE + + int32_t metaSize = sizeof(SCTableMeta); + *pTableMeta = taosMemoryCalloc(1, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(*pTableMeta, tbMeta, metaSize); + + //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); + + if (tbCache) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + taosHashRelease(dbCache->tbCache, tbCache); + *pTb = NULL; + } + + ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, + ctx->tbInfo.tbType, dbFName); + + ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); + if (NULL == tbCache) { + taosMemoryFreeClear(*pTableMeta); + *pDb = NULL; + ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); + return TSDB_CODE_SUCCESS; + } + + *pTb = tbCache; + + STableMeta *stbMeta = tbCache->pMeta; + if (stbMeta->suid != ctx->tbInfo.suid) { + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); + taosMemoryFreeClear(*pTableMeta); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + metaSize = CTG_META_SIZE(stbMeta); + *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); + if (NULL == *pTableMeta) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { int32_t code = 0; SCtgDBCache *dbCache = NULL; @@ -397,70 +550,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** return TSDB_CODE_SUCCESS; } - STableMeta *tbMeta = tbCache->pMeta; - ctx->tbInfo.inCache = true; - ctx->tbInfo.dbId = dbCache->dbId; - ctx->tbInfo.suid = tbMeta->suid; - ctx->tbInfo.tbType = tbMeta->tableType; - - if (tbMeta->tableType != TSDB_CHILD_TABLE) { - int32_t metaSize = CTG_META_SIZE(tbMeta); - *pTableMeta = taosMemoryCalloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(*pTableMeta, tbMeta, metaSize); - - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); - return TSDB_CODE_SUCCESS; - } - - // PROCESS FOR CHILD TABLE - - int32_t metaSize = sizeof(SCTableMeta); - *pTableMeta = taosMemoryCalloc(1, metaSize); - if (NULL == *pTableMeta) { - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(*pTableMeta, tbMeta, metaSize); - - //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - - if (tbCache) { - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - taosHashRelease(dbCache->tbCache, tbCache); - } - - ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, - ctx->tbInfo.tbType, dbFName); - - ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); - if (NULL == tbCache) { - //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - taosMemoryFreeClear(*pTableMeta); - ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); - return TSDB_CODE_SUCCESS; - } - - STableMeta *stbMeta = tbCache->pMeta; - if (stbMeta->suid != ctx->tbInfo.suid) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - metaSize = CTG_META_SIZE(stbMeta); - *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); - if (NULL == *pTableMeta) { - ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); + CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName)); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 59545d8fbc..36420599b3 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, - pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); + pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, + TSDB_DEFAULT_TABLE_TTL); } static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { @@ -829,6 +830,44 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo return code; } +static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) { + SVgroupInfo vg; + int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta); + if (TSDB_CODE_SUCCESS == code) { + if (NULL != pStmt->pTableMeta) { + code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); + } + *pMissCache = (NULL == pStmt->pTableMeta); + } + return code; +} + +static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) { + SParseContext* pComCxt = pCxt->pComCxt; + int32_t code = TSDB_CODE_SUCCESS; + if (pComCxt->async) { + code = getTableMetaAndVgroupImpl(pComCxt, pStmt, pMissCache); + } else { + code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache); + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + } + } + return code; +} + +static int32_t collectUseTable(const SName* pName, SHashObj* pTable) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName)); +} + +static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pName, dbFName); + return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName)); +} + static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pCxt->forceUpdate) { pCxt->missCache = true; @@ -836,12 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt } int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); +#if 0 if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); } +#else + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); + } +#endif + if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { + code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj); + } + } return code; } @@ -862,6 +913,12 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); } + if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { + code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj); + } + } return code; } @@ -933,11 +990,10 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* if (pStmt->usingTableProcessing) { pStmt->pTableMeta->uid = 0; } - - return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), - getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, - &pStmt->createTblReq); + + return insGetDataBlockFromList( + pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), + getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq); } char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); @@ -1540,8 +1596,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; - int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, - pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); + int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, + pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, + pStmt->usingTableName.tname); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pStmt->pVgroupsHashObj = NULL; @@ -1776,16 +1833,25 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR static int32_t setRefreshMate(SQuery* pQuery) { SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; - SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); - while (NULL != pTable) { - taosArrayPush(pQuery->pTableList, pTable); - pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); + + if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) { + taosArrayDestroy(pQuery->pTableList); + pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName)); + SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); + while (NULL != pTable) { + taosArrayPush(pQuery->pTableList, pTable); + pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); + } } - char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); - while (NULL != pDb) { - taosArrayPush(pQuery->pDbList, pDb); - pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); + if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) { + taosArrayDestroy(pQuery->pDbList); + pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN); + char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); + while (NULL != pDb) { + taosArrayPush(pQuery->pDbList, pDb); + pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); + } } return TSDB_CODE_SUCCESS; @@ -1899,29 +1965,28 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm } static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) { + SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; if (pCxt->missCache) { - parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId, - ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId, + pStmt->totalRowsNum, pStmt->totalTbNum); pQuery->execStage = QUERY_EXEC_STAGE_PARSE; - return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq); + return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq); } - parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId, - ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum, + pStmt->totalTbNum); pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE; return TSDB_CODE_SUCCESS; } int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) { - SInsertParseContext context = { - .pComCxt = pCxt, - .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, - .missCache = false, - .usingDuplicateTable = false, - .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) - }; + SInsertParseContext context = {.pComCxt = pCxt, + .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, + .missCache = false, + .usingDuplicateTable = false, + .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index cc51beb842..ae702ec02f 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -248,6 +248,13 @@ int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableNam return code; } +int32_t __catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { + int32_t code = g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true); + if (code) return code; + code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true); + return code; +} + int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pVgList) { return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); @@ -316,6 +323,7 @@ void initMetaDataEnv() { stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta); stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup); stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup); + stub.set(catalogGetCachedTableVgMeta, __catalogGetCachedTableVgMeta); stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo); stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion); stub.set(catalogGetDBVgList, __catalogGetDBVgList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6a83a9a4da..d20c2902f5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); + continue; } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index e3d886b046..648e99d6a5 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -720,14 +720,16 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage pgno = TDB_PAGE_PGNO(pPage); - tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); + tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); if (loadPage && pgno <= pPager->dbOrigSize) { init = 1; nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1)); - tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead); + tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead); if (nRead < pPage->pageSize) { ASSERT(0); + tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize); + TDB_UNLOCK_PAGE(pPage); return -1; } } else { diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 9175067568..331d241745 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -394,8 +394,8 @@ char *taosDirEntryBaseName(char *name) { char *pPoint = strrchr(name, '/'); if (pPoint != NULL) { if (*(pPoint + 1) == '\0') { - *pPoint = '\0'; - return taosDirEntryBaseName(name); + *pPoint = '\0'; + return taosDirEntryBaseName(name); } return pPoint + 1; } @@ -500,8 +500,9 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { void taosGetCwd(char *buf, int32_t len) { #if !defined(WINDOWS) - (void)getcwd(buf, len - 1); + char *unused __attribute__((unused)); + unused = getcwd(buf, len - 1); #else - strncpy(buf, "not implemented on windows", len -1); + strncpy(buf, "not implemented on windows", len - 1); #endif } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 28ba7ea60f..154eebb4de 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1037,4 +1037,4 @@ ,,n,docs-examples-test,bash node.sh ,,n,docs-examples-test,bash csharp.sh ,,n,docs-examples-test,bash jdbc.sh -,,n,docs-examples-test,bash go.sh +#,,n,docs-examples-test,bash go.sh diff --git a/tests/script/tsim/dnode/balance2.sim b/tests/script/tsim/dnode/balance2.sim index f677d3925c..3f5a42d4d3 100644 --- a/tests/script/tsim/dnode/balance2.sim +++ b/tests/script/tsim/dnode/balance2.sim @@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode5 -i 5 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode5 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode5 -c supportVnodes -v 5 print ========== step1 system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/tsim/vnode/stable_balance_replica1.sim b/tests/script/tsim/vnode/stable_balance_replica1.sim index f124979397..84190a3be0 100644 --- a/tests/script/tsim/vnode/stable_balance_replica1.sim +++ b/tests/script/tsim/vnode/stable_balance_replica1.sim @@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 $dbPrefix = br1_db $tbPrefix = br1_tb