fix query tsma child table

This commit is contained in:
wangjiaming0909 2024-02-28 10:06:14 +08:00
parent dff0449474
commit a1054234e2
12 changed files with 373 additions and 108 deletions

View File

@ -110,6 +110,8 @@ typedef struct SScanLogicNode {
int8_t igCheckUpdate;
SArray* pSmaIndexes;
SArray* pTsmas;
SArray* pTsmaTargetCTbVgInfo;
SArray* pTsmaTargetCTbInfo;
SNodeList* pGroupTags;
bool groupSort;
SNodeList* pTags; // for create stream

View File

@ -190,6 +190,11 @@ typedef struct STableNode {
struct STableMeta;
typedef struct STsmaTargetCTbInfo {
char ctableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
} STsmaTargetCTbInfo;
typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
@ -199,6 +204,8 @@ typedef struct SRealTableNode {
SArray* pSmaIndexes;
int8_t cacheLastMode;
SArray* pTsmas;
SArray* tsmaTargetCTbVgInfo; // SArray<SVgroupsInfo*>
SArray* tsmaTargetCTbInfo; // SArray<STsmaTargetCTbInfo>
} SRealTableNode;
typedef struct STempTableNode {

View File

@ -263,9 +263,9 @@ typedef struct SCtgViewsCtx {
} SCtgViewsCtx;
typedef enum {
FETCH_TB_META,
FETCH_TSMA_FOR_TB,
FETCH_PROGRESS_FOR_TSMA,
FETCH_TSMA_SOURCE_TB_META,
FETCH_TB_TSMA,
FETCH_TSMA_STREAM_PROGRESS,
} CTG_TSMA_FETCH_TYPE;
typedef struct SCtgTSMAFetch {
@ -274,9 +274,18 @@ typedef struct SCtgTSMAFetch {
int32_t tbIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t subFetchNum;
int32_t finishedSubFetchNum;
int32_t vgNum;
// tb meta
int32_t flag;
int32_t vgId;
// stream progress
int32_t subFetchNum;
int32_t finishedSubFetchNum;
int32_t vgNum;
// tb tsma
SName tsmaSourceTbName;
} SCtgTSMAFetch;
typedef struct SCtgTbTSMACtx {
@ -1150,8 +1159,8 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache);
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
void* bInput);
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx,
int32_t flag);
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag,
CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;

View File

@ -2682,7 +2682,19 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA));
switch (pFetch->fetchType) {
case FETCH_TSMA_SOURCE_TB_META: {
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
} break;
case FETCH_TB_TSMA: {
CTG_ERR_RET(
ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA));
} break;
default:
ASSERT(0);
break;
}
}
return TSDB_CODE_SUCCESS;
@ -2784,6 +2796,38 @@ _return:
CTG_RET(code);
}
static int32_t ctgTsmaFetchStreamProgress(SCtgTaskReq* tReq, SHashObj* pVgHash, const STableTSMAInfoRsp* pTsmas) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
int32_t subFetchIdx = 0;
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
SRequestConnInfo* pConn = &pTask->pJob->conn;
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
SVgroupInfo* pVgInfo = NULL;
pFetch->vgNum = taosHashGetSize(pVgHash);
for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i);
pVgInfo = taosHashIterate(pVgHash, NULL);
pTsmaInfo->reqTs = taosGetTimestampMs();
while (pVgInfo) {
// make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsmaInfo->streamUid,
.subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++;
pVgInfo = taosHashIterate(pVgHash, pVgInfo);
}
}
_return:
CTG_RET(code);
}
int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
bool taskDone = false;
int32_t code = 0;
@ -2798,19 +2842,20 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
SCtgDBCache* pDbCache = NULL;
STableTSMAInfo* pTsma = NULL;
SRequestConnInfo* pConn = &pTask->pJob->conn;
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
switch (reqType) {
case TDMT_MND_GET_TABLE_TSMA: {
STableTSMAInfoRsp* pOut = pMsgCtx->out;
pFetch->fetchType = FETCH_TSMA_STREAM_PROGRESS;
pRes->pRes = pOut;
pMsgCtx->out = NULL;
if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) {
// fetch progress
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache);
if (!pDbCache) {
// do not know which vnodes to fetch, fetch vnode list first
@ -2820,23 +2865,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} else {
// fetch progress from every vnode
int32_t subFetchIdx = 0;
pFetch->vgNum = taosHashGetSize(pDbCache->vgCache.vgInfo->vgHash);
for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pOut->pTsmas, i);
pTsmaInfo->reqTs = taosGetTimestampMs();
SVgroupInfo* pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, NULL);
while (pVgInfo) {
// make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsmaInfo->streamUid,
.subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++;
pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo);
}
}
CTG_ERR_JRET(ctgTsmaFetchStreamProgress(tReq, pDbCache->vgCache.vgInfo->vgHash, pOut));
ctgReleaseVgInfoToCache(pCtg, pDbCache);
pDbCache = NULL;
}
@ -2878,28 +2907,43 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
}
} break;
case TDMT_MND_USE_DB: {
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
STableTSMAInfoRsp* pTsmas = pRes->pRes;
int32_t subFetchIdx = 0;
pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash);
TSWAP(pOut->dbVgroup->vgHash, pVgHash);
for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i);
SVgroupInfo* pVgInfo = taosHashIterate(pVgHash, NULL);
while (pVgInfo) {
// make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsmaInfo->streamUid,
.subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++;
pVgInfo = taosHashIterate(pVgHash, pVgInfo);
}
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
switch (pFetch->fetchType) {
case FETCH_TSMA_SOURCE_TB_META: {
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, pOut->dbVgroup, pTbName, &vgInfo));
pFetch->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pTbName, &vgInfo, NULL, tReq));
} break;
case FETCH_TSMA_STREAM_PROGRESS: {
STableTSMAInfoRsp* pTsmas = pRes->pRes;
TSWAP(pOut->dbVgroup->vgHash, pVgHash);
CTG_ERR_JRET(ctgTsmaFetchStreamProgress(tReq, pVgHash, pTsmas));
} break;
default:
ASSERT(0);
}
} break;
case TDMT_VND_TABLE_META: {
// handle source tb meta
ASSERT(pFetch->fetchType == FETCH_TSMA_SOURCE_TB_META);
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
pFetch->fetchType = FETCH_TB_TSMA;
pFetch->tsmaSourceTbName = *pTbName;
if (CTG_IS_META_NULL(pOut->metaType)) {
ctgTaskError("no tbmeta found when fetching tsma source tb meta: %s.%s", pTbName->dbname, pTbName->tname);
ctgRemoveTbMetaFromCache(pCtg, pTbName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
if (META_TYPE_BOTH_TABLE == pOut->metaType) {
// rewrite tsma fetch table with it's super table name
snprintf(pFetch->tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(pOut->tbName) + 1), "%s", pOut->tbName);
}
CTG_ERR_JRET(ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, tReq, TDMT_MND_GET_TABLE_TSMA));
} break;
default:
ASSERT(0);
}
@ -2921,9 +2965,7 @@ _return:
if (TSDB_CODE_MND_SMA_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS;
} else {
STablesReq* pReq = (STablesReq*)taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx);
ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pName->acctId, pName->dbname, pName->tname,
ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pTbName->acctId, pTbName->dbname, pTbName->tname,
tstrerror(code));
}
bool allSubFetchFinished = false;

View File

@ -3232,9 +3232,10 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta * lastTableMeta = NULL;
STableMeta * pTableMeta = NULL;
SName * pName = taosArrayGet(pList, 0);
int32_t tbNum = taosArrayGetSize(pList);
SCtgTbCache * pTbCache = NULL;
// TODO test sys db
if (IS_SYS_DBNAME(pName->dbname)) {
@ -3242,23 +3243,53 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
}
tNameGetFullDbName(pName, dbFName);
// get db cache
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
if (!dbCache) {
ctgDebug("DB %s not in cache", dbFName);
// TODO test no db cache, select from another db
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
taosArrayPush(pCtx->pResList, &(SMetaData){0});
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
// get tb cache
pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tsmaCache, pName->tname, strlen(pName->tname));
pTbCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (!pTbCache) {
ctgDebug("tb: %s.%s not in cache", dbFName, pName->tname);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
continue;
}
uint64_t suid = pTbCache->pMeta->suid;
int8_t tbType = pTbCache->pMeta->tableType;
taosHashRelease(dbCache->tbCache, pTbCache);
SName tsmaSourceTbName = *pName;
// if child table, get stable name
if (tbType == TSDB_CHILD_TABLE) {
char* stbName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(uint64_t));
if (stbName) {
snprintf(tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(stbName) + 1), "%s", stbName);
taosHashRelease(dbCache->stbCache, stbName);
} else {
ctgDebug("stb in db: %s, uid: %" PRId64 " not in cache", dbFName, suid);
// TODO remove flag
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
continue;
}
}
// get tsma cache
pCache = taosHashAcquire(dbCache->tsmaCache, tsmaSourceTbName.tname, strlen(tsmaSourceTbName.tname));
if (!pCache) {
ctgDebug("tsma for tb: %s.%s not in cache", dbFName, pName->tname);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgDebug("tsma for tb: %s.%s not in cache", dbFName, tsmaSourceTbName.tname);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
continue;
@ -3268,8 +3299,8 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
ctgDebug("tsma for tb: %s.%s not in cache", pName->tname, dbFName);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
continue;

View File

@ -557,6 +557,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
@ -612,6 +617,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
@ -1556,6 +1566,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
return TSDB_CODE_SUCCESS;
}
// TODO test errors
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
void* bInput) {

View File

@ -2420,6 +2420,7 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
} else {
// TODO remove log
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
" passed: %" PRId64,
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
@ -2428,8 +2429,8 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
return ret;
}
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx,
int32_t flag) {
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag,
CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName) {
if (NULL == (*pFetchs)) {
*pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgTSMAFetch));
}
@ -2440,6 +2441,10 @@ int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t*
fetch.fetchIdx = (*fetchIdx)++;
fetch.resIdx = resIdx;
fetch.flag = flag;
fetch.fetchType = fetchType;
if (sourceTbName) fetch.tsmaSourceTbName = *sourceTbName;
taosArrayPush(*pFetchs, &fetch);
return TSDB_CODE_SUCCESS;

View File

@ -816,6 +816,8 @@ void nodesDestroyNode(SNode* pNode) {
taosMemoryFreeClear(pReal->pMeta);
taosMemoryFreeClear(pReal->pVgroupList);
taosArrayDestroyEx(pReal->pSmaIndexes, destroySmaIndex);
taosArrayDestroyP(pReal->tsmaTargetCTbVgInfo, taosMemoryFree);
taosArrayDestroy(pReal->tsmaTargetCTbInfo);
break;
}
case QUERY_NODE_TEMP_TABLE:
@ -1308,6 +1310,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam);
taosArrayDestroyP(pLogicNode->pTsmaTargetCTbVgInfo, taosMemoryFree);
taosArrayDestroy(pLogicNode->pTsmaTargetCTbInfo);
break;
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {

View File

@ -3656,13 +3656,76 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
}
static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
int32_t code = 0;
if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) {
return TSDB_CODE_SUCCESS;
}
if (isSelectStmt(pCxt->pCurrStmt)) {
return getTableTsmas(pCxt, pName, &pRealTable->pTsmas);
if (isSelectStmt(pCxt->pCurrStmt) && pRealTable->pMeta->tableType != TSDB_SYSTEM_TABLE) {
code = getTableTsmas(pCxt, pName, &pRealTable->pTsmas);
// if select from a child table, fetch it's corresponding tsma target child table infos
if (TSDB_CODE_SUCCESS == code && pRealTable->pTsmas && pRealTable->pMeta->tableType == TSDB_CHILD_TABLE) {
if (pRealTable->tsmaTargetCTbVgInfo) {
taosArrayDestroyP(pRealTable->tsmaTargetCTbVgInfo, taosMemoryFree);
pRealTable->tsmaTargetCTbVgInfo = NULL;
}
for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) {
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
SName tsmaTargetCTbName = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetCTbName);
int32_t len = snprintf(tsmaTargetCTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name);
len = taosCreateMD5Hash(tsmaTargetCTbName.tname, len);
sprintf(tsmaTargetCTbName.tname + len, "_%s", pRealTable->table.tableName);
SVgroupInfo vgInfo = {0};
bool exists = false;
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetCTbName, &vgInfo, &exists);
if (TSDB_CODE_SUCCESS == code) {
ASSERT(exists);
if (!pRealTable->tsmaTargetCTbVgInfo) {
pRealTable->tsmaTargetCTbVgInfo = taosArrayInit(pRealTable->pTsmas->size, POINTER_BYTES);
if (!pRealTable->tsmaTargetCTbVgInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
}
SVgroupsInfo* pVgpsInfo = taosMemoryCalloc(1, sizeof(int32_t) + sizeof(SVgroupInfo));
if (!pVgpsInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
pVgpsInfo->numOfVgroups = 1;
pVgpsInfo->vgroups[0] = vgInfo;
taosArrayPush(pRealTable->tsmaTargetCTbVgInfo, &pVgpsInfo);
} else {
break;
}
STableMeta* pTableMeta = NULL;
if (code == TSDB_CODE_SUCCESS) {
SRequestConnInfo conn = {.pTrans = pCxt->pParseCxt->pTransporter,
.requestId = pCxt->pParseCxt->requestId,
.requestObjRefId = pCxt->pParseCxt->requestRid,
.mgmtEps = pCxt->pParseCxt->mgmtEpSet};
code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, &conn, &tsmaTargetCTbName, &pTableMeta);
}
if (code == TSDB_CODE_SUCCESS) {
STsmaTargetCTbInfo ctbInfo = {0};
sprintf(ctbInfo.ctableName, "%s", tsmaTargetCTbName.tname);
ctbInfo.uid = pTableMeta->uid;
taosMemoryFree(pTableMeta);
if (!pRealTable->tsmaTargetCTbInfo) {
pRealTable->tsmaTargetCTbInfo = taosArrayInit(pRealTable->pTsmas->size, sizeof(STsmaTargetCTbInfo));
if (!pRealTable->tsmaTargetCTbInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
}
taosArrayPush(pRealTable->tsmaTargetCTbInfo, &ctbInfo);
}
}
}
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -5562,20 +5625,19 @@ static int32_t findEqualCondTbname(STranslateContext* pCxt, SNode* pWhere, SArra
return TSDB_CODE_SUCCESS;
}
static int32_t findVgroupsFromEqualTbname(STranslateContext* pCxt, SEqCondTbNameTableInfo* pInfo,
SVgroupsInfo* vgsInfo) {
static int32_t findVgroupsFromEqualTbname(STranslateContext* pCxt, SArray* aTbnames, const char* dbName,
int32_t numOfVgroups, SVgroupsInfo* vgsInfo) {
int32_t nVgroups = 0;
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
int32_t nTbls = taosArrayGetSize(aTbnames);
if (nTbls >= pInfo->pRealTable->pVgroupList->numOfVgroups) {
if (nTbls >= numOfVgroups) {
vgsInfo->numOfVgroups = 0;
return TSDB_CODE_SUCCESS;
}
for (int j = 0; j < nTbls; ++j) {
char* dbName = pInfo->pRealTable->table.dbName;
SName snameTb;
char* tbName = taosArrayGetP(pInfo->aTbnames, j);
char* tbName = taosArrayGetP(aTbnames, j);
toName(pCxt->pParseCxt->acctId, dbName, tbName, &snameTb);
SVgroupInfo vgInfo = {0};
bool bExists;
@ -5605,16 +5667,55 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < taosArrayGetSize(aTables); ++i) {
SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i);
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups;
SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
int32_t nVgroups = 0;
findVgroupsFromEqualTbname(pCxt, pInfo, vgsInfo);
findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo);
if (vgsInfo->numOfVgroups != 0) {
taosMemoryFree(pInfo->pRealTable->pVgroupList);
pInfo->pRealTable->pVgroupList = vgsInfo;
} else {
taosMemoryFree(vgsInfo);
}
vgsInfo = NULL;
if (pInfo->pRealTable->pTsmas) {
pInfo->pRealTable->tsmaTargetCTbVgInfo = taosArrayInit(pInfo->pRealTable->pTsmas->size, POINTER_BYTES);
if (!pInfo->pRealTable->tsmaTargetCTbVgInfo) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) {
STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i);
SArray *pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES);
if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
char* pNewTbName = taosMemoryCalloc(1, 34 + strlen(pTbName) + 1);
if (!pNewTbName) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
taosArrayPush(pTbNames, &pNewTbName);
sprintf(pNewTbName, "%s.%s", pTsma->dbFName, pTsma->name);
int32_t len = taosCreateMD5Hash(pNewTbName, strlen(pNewTbName));
sprintf(pNewTbName + len, "_%s", pTbName);
}
if (TSDB_CODE_SUCCESS == code) {
vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
if (!vgsInfo) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
findVgroupsFromEqualTbname(pCxt, pTbNames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo);
if (vgsInfo->numOfVgroups != 0) {
taosArrayPush(pInfo->pRealTable->tsmaTargetCTbVgInfo, &vgsInfo);
} else {
taosMemoryFree(vgsInfo);
}
}
taosArrayDestroyP(pTbNames, taosMemoryFree);
if (code) break;
}
}
}
return TSDB_CODE_SUCCESS;

View File

@ -365,6 +365,8 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList);
TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes);
TSWAP(pScan->pTsmas, pRealTable->pTsmas);
TSWAP(pScan->pTsmaTargetCTbVgInfo, pRealTable->tsmaTargetCTbVgInfo);
TSWAP(pScan->pTsmaTargetCTbInfo, pRealTable->tsmaTargetCTbInfo);
pScan->tableId = pRealTable->pMeta->uid;
pScan->stableId = pRealTable->pMeta->suid;
pScan->tableType = pRealTable->pMeta->tableType;

View File

@ -5809,6 +5809,8 @@ typedef struct STSMAOptUsefulTsma {
const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation
STimeWindow scanRange; // scan time range for this tsma
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL
uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL
} STSMAOptUsefulTsma;
typedef struct STSMAOptCtx {
@ -6098,9 +6100,9 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
// TODO why 2?
pCol->colId = *idx + 2;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->tableId = pTsma->pTsma->destTbUid;
pCol->tableId = pTsma->targetTbUid;
pCol->colType = COLUMN_TYPE_COLUMN;
strcpy(pCol->tableName, pTsma->pTsma->targetTb);
strcpy(pCol->tableName, pTsma->targetTbName);
strcpy(pCol->dbName, pTsma->pTsma->targetDbFName);
strcpy(pCol->colName, pFunc->node.aliasName);
strcpy(pCol->node.aliasName, pFunc->node.aliasName);
@ -6127,9 +6129,9 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, i);
if (strcmp(pTagCol->colName, pSchema->name) == 0) {
strcpy(pTagCol->tableName, pTsma->pTsma->targetTb);
strcpy(pTagCol->tableAlias, pTsma->pTsma->targetTb);
pTagCol->tableId = pTsma->pTsma->destTbUid;
strcpy(pTagCol->tableName, pTsma->targetTbName);
strcpy(pTagCol->tableAlias, pTsma->targetTbName);
pTagCol->tableId = pTsma->targetTbUid;
pTagCol->tableType = TSDB_SUPER_TABLE;
pTagCol->colId = pSchema->colId;
found = true;
@ -6214,7 +6216,7 @@ struct TsmaOptRewriteCtx {
int32_t code;
};
EDealRes tsmaOptRewriter(SNode** ppNode, void* ctx) {
EDealRes tsmaOptNodeRewriter(SNode** ppNode, void* ctx) {
SNode* pNode = *ppNode;
int32_t code = 0;
struct TsmaOptRewriteCtx* pCtx = ctx;
@ -6233,6 +6235,27 @@ EDealRes tsmaOptRewriter(SNode** ppNode, void* ctx) {
return DEAL_RES_CONTINUE;
}
static int32_t tsmaOptRewriteNode(SNode* pNode, STSMAOptCtx* pCtx, const STSMAOptUsefulTsma* pTsma, bool rewriteTbName, bool rewriteTag) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pCtx, .pTsma = pTsma, .rewriteTag = rewriteTag, .rewriteTbname = rewriteTbName, .code = 0};
nodesRewriteExpr(&pNode, tsmaOptNodeRewriter, &ctx);
return ctx.code;
}
static int32_t tsmaOptRewriteNodeList(SNodeList* pNodes, STSMAOptCtx* pCtx, const STSMAOptUsefulTsma* pTsma,
bool rewriteTbName, bool rewriteTag) {
int32_t code = 0;
SNode* pNode;
FOREACH(pNode, pNodes) {
code = tsmaOptRewriteNode(pNode, pCtx, pTsma, rewriteTbName, rewriteTag);
if (TSDB_CODE_SUCCESS == code) {
// TODO do we need to replace node??
REPLACE_NODE(pNode);
}
}
return code;
}
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNewScan, const STSMAOptUsefulTsma* pTsma) {
SNode* pNode;
int32_t code = 0;
@ -6261,46 +6284,49 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (pNewScan->tableType == TSDB_CHILD_TABLE) {
}
if (code == TSDB_CODE_SUCCESS && pPkTsCol) {
tstrncpy(pPkTsCol->tableName, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN);
tstrncpy(pPkTsCol->tableAlias, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN);
pPkTsCol->tableId = pTsma->pTsma->destTbUid;
pPkTsCol->tableType = TSDB_SUPER_TABLE;
tstrncpy(pPkTsCol->tableName, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
tstrncpy(pPkTsCol->tableAlias, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
pPkTsCol->tableId = pTsma->targetTbUid;
nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol));
}
if (code == TSDB_CODE_SUCCESS) {
// TODO handle child tables
pNewScan->stableId = pTsma->pTsma->destTbUid;
pNewScan->tableId = pTsma->pTsma->destTbUid;
pNewScan->tableType = TSDB_SUPER_TABLE;
strcpy(pNewScan->tableName.tname, pTsma->pTsma->targetTb); // TODO set dbName
pNewScan->tableId = pTsma->targetTbUid;
strcpy(pNewScan->tableName.tname, pTsma->targetTbName);
}
if (code == TSDB_CODE_SUCCESS) {
// pseudo columns
FOREACH(pNode, pNewScan->pScanPseudoCols) {
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
code = tsmaOptRewriteTag(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
}
}
code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, false, true);
}
if (code == TSDB_CODE_SUCCESS) {
FOREACH(pNode, pNewScan->pGroupTags) {
// TODO rewrite tag and tbname recursively
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pNode, tsmaOptRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
REPLACE_NODE(pNode);
code = tsmaOptRewriteNode(pNewScan->pTagCond, pTsmaOptCtx, pTsma, true, true);
}
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true);
}
if (pNewScan->pTsmaTargetCTbVgInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pNewScan->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pNewScan->pTsmas, i);
if (pTsmaInfo == pTsma->pTsma) {
SVgroupsInfo* pVgpsInfo = taosArrayGetP(pNewScan->pTsmaTargetCTbVgInfo, i);
taosMemoryFreeClear(pNewScan->pVgroupList);
int32_t len = sizeof(int32_t) + sizeof(SVgroupInfo) * pVgpsInfo->numOfVgroups;
pNewScan->pVgroupList = taosMemoryCalloc(1, len);
memcpy(pNewScan->pVgroupList, pVgpsInfo, len);
break;
}
}
}
} else {
// TODO rewrite tagcond?
FOREACH(pNode, pNewScan->pGroupTags) {
// rewrite tbname recursively
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = NULL, .rewriteTag = false, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pNode, tsmaOptRewriter, &ctx);
nodesRewriteExpr(&pNode, tsmaOptNodeRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
@ -6337,7 +6363,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
SColumnNode* pColNode;
SWindowLogicNode* pWindow;
SWindowLogicNode* pWindow = NULL;
SAggLogicNode* pAgg;
SNodeList* pAggFuncs;
SListCell* pScanListCell;
@ -6360,7 +6386,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx);
nodesRewriteExpr(&pAggFuncNode, tsmaOptNodeRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
@ -6437,7 +6463,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx);
nodesRewriteExpr(&pAggFuncNode, tsmaOptNodeRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
@ -6498,6 +6524,24 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
const STSMAOptUsefulTsma* pTsma = NULL;
SNodeList* pAggFuncs = NULL;
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
if (pTsma->pTsma) {
if (pTsmaOptCtx->pScan->tableType == TSDB_CHILD_TABLE) {
for (int32_t j = 0; j < pTsmaOptCtx->pScan->pTsmas->size; ++j) {
if (taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, j) == pTsma->pTsma) {
const STsmaTargetCTbInfo* pCtbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetCTbInfo, j);
strcpy(pTsma->targetTbName, pCtbInfo->ctableName);
pTsma->targetTbUid = pCtbInfo->uid;
}
}
} else {
strcpy(pTsma->targetTbName, pTsma->pTsma->targetTb);
pTsma->targetTbUid = pTsma->pTsma->destTbUid;
}
}
}
for (int32_t i = 1; i < pTsmaOptCtx->pUsedTsmas->size && code == TSDB_CODE_SUCCESS; ++i) {
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);

View File

@ -50,6 +50,10 @@ class UsedTsma:
def setIsTsma(self):
self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX)
if not self.is_tsma_:
pos = self.name.find('_')
if pos == 32:
self.is_tsma_ = True
class TSMAQueryContext:
def __init__(self) -> None:
@ -106,9 +110,12 @@ class TSMAQCBuilder:
self.qc_.used_tsmas.append(used_tsma)
return self
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQCBuilder':
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str, child_tb: bool = False) -> 'TSMAQCBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
if child_tb:
used_tsma.name = tsma_name
else:
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = True
@ -466,7 +473,7 @@ class TDTestCase:
def test_query_sub_table(self):
sql = 'select avg(c1) from t1'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('e8945e7385834f8c22705546d4016539_t1', UsedTsma.TS_MIN, UsedTsma.TS_MAX, child_tb=True).get_qc()
self.tsma_tester.check_sql(sql, ctx)
@ -606,7 +613,7 @@ class TDTestCase:
self.test_create_tsma_on_norm_table()
self.test_create_tsma_on_child_table()
self.test_create_recursive_tsma()
## self.test_drop_stable()
## self.test_drop_stable() ## drop stable and recreate a stable
## self.test_drop_ctable()
self.test_drop_db()