Merge pull request #18815 from taosdata/enh/insertOptimize
enh: insert multi tables optimize
This commit is contained in:
commit
4efa7c2b47
|
@ -211,6 +211,8 @@ int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STab
|
|||
|
||||
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
||||
|
||||
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta);
|
||||
|
||||
/**
|
||||
* Force refresh DB's local cached vgroup info.
|
||||
* @param pCtg (input, got with catalogGetHandle)
|
||||
|
|
|
@ -802,7 +802,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
|||
tstrerror(code));
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pWrapper->pCatalogReq->forceUpdate = false;
|
||||
//pWrapper->pCatalogReq->forceUpdate = false;
|
||||
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
}
|
||||
|
||||
|
@ -883,6 +883,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
|
||||
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
||||
code = pRequest->prevCode;
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
tscDebug("call sync query cb with code: %s", tstrerror(code));
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
return;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -933,6 +938,17 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
|
||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
doAsyncQuery(pRequest, true);
|
||||
return;
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
|
|
@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
|||
int32_t ctgdGetOneHandle(SCatalog **pHandle);
|
||||
int ctgVgInfoComp(const void* lp, const void* rp);
|
||||
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
|
||||
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb);
|
||||
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName);
|
||||
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
extern SCtgDebug gCTGDebug;
|
||||
|
|
|
@ -551,6 +551,35 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
|
||||
int32_t code = 0;
|
||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, db);
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCtgTbCache *tbCache = NULL;
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgMetaFromCache(pCtg, db, pTableName->tname, &dbCache, &tbCache));
|
||||
|
||||
if (NULL == dbCache || NULL == tbCache) {
|
||||
*pTableMeta = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup));
|
||||
|
||||
SCtgTbMetaCtx ctx = {0};
|
||||
ctx.pName = (SName*)pTableName;
|
||||
ctx.flag = CTG_FLAG_UNKNOWN_STB;
|
||||
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db));
|
||||
|
||||
_return:
|
||||
|
||||
ctgReleaseVgMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -1118,6 +1147,13 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName,
|
|||
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists));
|
||||
}
|
||||
|
||||
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
CTG_API_LEAVE(ctgGetCachedTbVgMeta(pCtg, pTableName, pVgroup, pTableMeta));
|
||||
}
|
||||
|
||||
|
||||
#if 0
|
||||
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
|
||||
CTG_API_ENTER();
|
||||
|
|
|
@ -130,7 +130,7 @@ void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
|||
}
|
||||
|
||||
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
|
||||
if (pCache) {
|
||||
if (pCache && dbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, pCache);
|
||||
}
|
||||
|
@ -151,6 +151,18 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache
|
|||
}
|
||||
}
|
||||
|
||||
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
|
||||
if (pCache && dbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, pCache);
|
||||
}
|
||||
|
||||
if (dbCache) {
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||
|
@ -226,6 +238,75 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCtgTbCache *tbCache = NULL;
|
||||
bool vgInCache = false;
|
||||
|
||||
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||
if (NULL == dbCache) {
|
||||
ctgDebug("db %s not in cache", dbFName);
|
||||
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
|
||||
if (!vgInCache) {
|
||||
ctgDebug("vgInfo of db %s not in cache", dbFName);
|
||||
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
*pDb = dbCache;
|
||||
|
||||
CTG_CACHE_STAT_INC(numOfVgHit, 1);
|
||||
|
||||
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
|
||||
|
||||
tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
|
||||
if (NULL == tbCache) {
|
||||
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
|
||||
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
||||
if (NULL == tbCache->pMeta) {
|
||||
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
|
||||
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
*pTb = tbCache;
|
||||
|
||||
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
|
||||
|
||||
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (tbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, tbCache);
|
||||
}
|
||||
|
||||
if (vgInCache) {
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
}
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
*pDb = NULL;
|
||||
*pTb = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
@ -378,6 +459,78 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) {
|
||||
SCtgDBCache *dbCache = *pDb;
|
||||
SCtgTbCache *tbCache = *pTb;
|
||||
STableMeta *tbMeta = tbCache->pMeta;
|
||||
ctx->tbInfo.inCache = true;
|
||||
ctx->tbInfo.dbId = dbCache->dbId;
|
||||
ctx->tbInfo.suid = tbMeta->suid;
|
||||
ctx->tbInfo.tbType = tbMeta->tableType;
|
||||
|
||||
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
int32_t metaSize = CTG_META_SIZE(tbMeta);
|
||||
*pTableMeta = taosMemoryCalloc(1, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
||||
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// PROCESS FOR CHILD TABLE
|
||||
|
||||
int32_t metaSize = sizeof(SCTableMeta);
|
||||
*pTableMeta = taosMemoryCalloc(1, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
||||
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
if (tbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, tbCache);
|
||||
*pTb = NULL;
|
||||
}
|
||||
|
||||
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
|
||||
ctx->tbInfo.tbType, dbFName);
|
||||
|
||||
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
|
||||
if (NULL == tbCache) {
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
*pDb = NULL;
|
||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pTb = tbCache;
|
||||
|
||||
STableMeta *stbMeta = tbCache->pMeta;
|
||||
if (stbMeta->suid != ctx->tbInfo.suid) {
|
||||
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
metaSize = CTG_META_SIZE(stbMeta);
|
||||
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
|
||||
int32_t code = 0;
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
@ -397,70 +550,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta *tbMeta = tbCache->pMeta;
|
||||
ctx->tbInfo.inCache = true;
|
||||
ctx->tbInfo.dbId = dbCache->dbId;
|
||||
ctx->tbInfo.suid = tbMeta->suid;
|
||||
ctx->tbInfo.tbType = tbMeta->tableType;
|
||||
|
||||
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
int32_t metaSize = CTG_META_SIZE(tbMeta);
|
||||
*pTableMeta = taosMemoryCalloc(1, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
||||
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// PROCESS FOR CHILD TABLE
|
||||
|
||||
int32_t metaSize = sizeof(SCTableMeta);
|
||||
*pTableMeta = taosMemoryCalloc(1, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
||||
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
if (tbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, tbCache);
|
||||
}
|
||||
|
||||
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
|
||||
ctx->tbInfo.tbType, dbFName);
|
||||
|
||||
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
|
||||
if (NULL == tbCache) {
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta *stbMeta = tbCache->pMeta;
|
||||
if (stbMeta->suid != ctx->tbInfo.suid) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
metaSize = CTG_META_SIZE(stbMeta);
|
||||
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
|
||||
if (NULL == *pTableMeta) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
|
||||
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
|
||||
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
|
|
|
@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
|
|||
|
||||
static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) {
|
||||
insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
|
||||
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
|
||||
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
|
||||
TSDB_DEFAULT_TABLE_TTL);
|
||||
}
|
||||
|
||||
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
||||
|
@ -829,6 +830,44 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||
SVgroupInfo vg;
|
||||
int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL != pStmt->pTableMeta) {
|
||||
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
||||
}
|
||||
*pMissCache = (NULL == pStmt->pTableMeta);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||
SParseContext* pComCxt = pCxt->pComCxt;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pComCxt->async) {
|
||||
code = getTableMetaAndVgroupImpl(pComCxt, pStmt, pMissCache);
|
||||
} else {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pName, fullName);
|
||||
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
|
||||
}
|
||||
|
||||
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pName, dbFName);
|
||||
return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
|
||||
}
|
||||
|
||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
|
@ -836,12 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
|
|||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
#if 0
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
}
|
||||
#else
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||
}
|
||||
#endif
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
|
||||
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -862,6 +913,12 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt*
|
|||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
|
||||
code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -933,11 +990,10 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt*
|
|||
if (pStmt->usingTableProcessing) {
|
||||
pStmt->pTableMeta->uid = 0;
|
||||
}
|
||||
|
||||
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid),
|
||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL,
|
||||
&pStmt->createTblReq);
|
||||
|
||||
return insGetDataBlockFromList(
|
||||
pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq);
|
||||
}
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&pStmt->targetTableName, tbFName);
|
||||
|
@ -1540,8 +1596,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
|
|||
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
||||
|
||||
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
|
||||
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing,
|
||||
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
|
||||
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
|
||||
pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
|
||||
pStmt->usingTableName.tname);
|
||||
|
||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||
pStmt->pVgroupsHashObj = NULL;
|
||||
|
@ -1776,16 +1833,25 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR
|
|||
|
||||
static int32_t setRefreshMate(SQuery* pQuery) {
|
||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
||||
while (NULL != pTable) {
|
||||
taosArrayPush(pQuery->pTableList, pTable);
|
||||
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
||||
|
||||
if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
|
||||
taosArrayDestroy(pQuery->pTableList);
|
||||
pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
|
||||
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
||||
while (NULL != pTable) {
|
||||
taosArrayPush(pQuery->pTableList, pTable);
|
||||
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
||||
}
|
||||
}
|
||||
|
||||
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
||||
while (NULL != pDb) {
|
||||
taosArrayPush(pQuery->pDbList, pDb);
|
||||
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
||||
if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
|
||||
taosArrayDestroy(pQuery->pDbList);
|
||||
pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
|
||||
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
||||
while (NULL != pDb) {
|
||||
taosArrayPush(pQuery->pDbList, pDb);
|
||||
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1899,29 +1965,28 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm
|
|||
}
|
||||
|
||||
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||
if (pCxt->missCache) {
|
||||
parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId,
|
||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
||||
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId,
|
||||
pStmt->totalRowsNum, pStmt->totalTbNum);
|
||||
|
||||
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
||||
return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq);
|
||||
return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
|
||||
}
|
||||
|
||||
parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId,
|
||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
||||
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
|
||||
pStmt->totalTbNum);
|
||||
|
||||
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
|
||||
SInsertParseContext context = {
|
||||
.pComCxt = pCxt,
|
||||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)
|
||||
};
|
||||
SInsertParseContext context = {.pComCxt = pCxt,
|
||||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
|
||||
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -248,6 +248,13 @@ int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableNam
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t __catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
|
||||
int32_t code = g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true);
|
||||
if (code) return code;
|
||||
code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
SArray** pVgList) {
|
||||
return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList);
|
||||
|
@ -316,6 +323,7 @@ void initMetaDataEnv() {
|
|||
stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta);
|
||||
stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup);
|
||||
stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup);
|
||||
stub.set(catalogGetCachedTableVgMeta, __catalogGetCachedTableVgMeta);
|
||||
stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo);
|
||||
stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion);
|
||||
stub.set(catalogGetDBVgList, __catalogGetDBVgList);
|
||||
|
|
|
@ -394,8 +394,8 @@ char *taosDirEntryBaseName(char *name) {
|
|||
char *pPoint = strrchr(name, '/');
|
||||
if (pPoint != NULL) {
|
||||
if (*(pPoint + 1) == '\0') {
|
||||
*pPoint = '\0';
|
||||
return taosDirEntryBaseName(name);
|
||||
*pPoint = '\0';
|
||||
return taosDirEntryBaseName(name);
|
||||
}
|
||||
return pPoint + 1;
|
||||
}
|
||||
|
@ -500,8 +500,9 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
|
|||
|
||||
void taosGetCwd(char *buf, int32_t len) {
|
||||
#if !defined(WINDOWS)
|
||||
(void)getcwd(buf, len - 1);
|
||||
char *unused __attribute__((unused));
|
||||
unused = getcwd(buf, len - 1);
|
||||
#else
|
||||
strncpy(buf, "not implemented on windows", len -1);
|
||||
strncpy(buf, "not implemented on windows", len - 1);
|
||||
#endif
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue