fix tsmas

This commit is contained in:
wangjiaming0909 2024-02-19 16:04:55 +08:00
parent e947a2fb0d
commit 8e56d9a359
12 changed files with 215 additions and 74 deletions

View File

@ -4226,7 +4226,7 @@ typedef struct {
int64_t streamUid; int64_t streamUid;
int64_t reqTs; int64_t reqTs;
int64_t rspTs; int64_t rspTs;
int64_t delayDuration; int64_t delayDuration; // ms
bool fillHistoryFinished; bool fillHistoryFinished;
} STableTSMAInfo; } STableTSMAInfo;

View File

@ -823,6 +823,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103) #define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3104) #define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3105) #define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3105)
#define TSDB_CODE_TSMA_INVALID_TB TAOS_DEF_ERROR_CODE(0, 0x3106)
#define TSDB_CODE_TSMA_INVALID_INTERVAL TAOS_DEF_ERROR_CODE(0, 0x3107)
//rsma //rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)

View File

@ -1449,7 +1449,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0; pCxt->pCreateStreamReq->igUpdate = 0;
// TODO what's this tiemstamp? // TODO what's this tiemstamp?
pCxt->pCreateStreamReq->lastTs = 1704442278000; pCxt->pCreateStreamReq->lastTs = 1755442278000;
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
@ -2163,7 +2163,7 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo *
tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN); tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
pInfo->dbId = pTsmaVer->dbId; pInfo->dbId = pTsmaVer->dbId;
pInfo->ast = "dummy";// TODO could be freed pInfo->ast = taosMemoryCalloc(1, 1);
*ppTsma = pInfo; *ppTsma = pInfo;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -531,6 +531,11 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
*fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask); *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (ver == -1) {
ver = pTask->chkInfo.processedVer;
} else {
ver--;
}
SVersionRange verRange = {0}; SVersionRange verRange = {0};
walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer); walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
@ -549,9 +554,13 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
cur = pReader->pHead->head.ingestTs; cur = pReader->pHead->head.ingestTs;
} }
code = walFetchHead(pReader, verRange.maxVer); if (ver == verRange.maxVer) {
if (code == TSDB_CODE_SUCCESS) { latest = cur;
latest = pReader->pHead->head.ingestTs; } else {
code = walFetchHead(pReader, verRange.maxVer);
if (code == TSDB_CODE_SUCCESS) {
latest = pReader->pHead->head.ingestTs;
}
} }
if (pDelay != NULL) { // delay in ms if (pDelay != NULL) { // delay in ms

View File

@ -783,6 +783,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
} }
if (tbTsmaNum > 0) { if (tbTsmaNum > 0) {
// TODO when create recursive tsma, avoid get tb tsma task
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL)); CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
} }
if (tsmaNum > 0) { if (tsmaNum > 0) {
@ -2705,6 +2706,10 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0}; SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0};
taosArrayPush(pCtx->pResList, &(SMetaRes){0}); taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA)); CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA));
} else {
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
} }
return 0; return 0;
@ -2785,6 +2790,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
SArray* pTsmas = NULL; SArray* pTsmas = NULL;
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx);
SHashObj* pVgHash = NULL;
SCtgDBCache* pDbCache = NULL; SCtgDBCache* pDbCache = NULL;
STableTSMAInfo* pTsma = NULL; STableTSMAInfo* pTsma = NULL;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -2849,8 +2855,9 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx); STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx);
if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true; if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true;
pTsmaInfo->rspTs = taosGetTimestampMs(); pTsmaInfo->rspTs = taosGetTimestampMs();
pTsmaInfo->delayDuration = MAX(pRsp->progressDelay, pTsmaInfo->delayDuration); pTsmaInfo->delayDuration = TMAX(pRsp->progressDelay, pTsmaInfo->delayDuration);
pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished; pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished;
qDebug("received stream progress for tsma %s rsp history: %d vnode: %d", pTsmaInfo->name, pRsp->fillHisFinished, pRsp->subFetchIdx);
if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) { if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) {
// subfetch all finished // subfetch all finished
@ -2873,19 +2880,20 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
STableTSMAInfoRsp* pTsmas = pRes->pRes; STableTSMAInfoRsp* pTsmas = pRes->pRes;
int32_t subFetchIdx = 0; int32_t subFetchIdx = 0;
pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash); pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash);
TSWAP(pOut->dbVgroup->vgHash, pVgHash);
for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i); STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i);
SVgroupInfo* pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, NULL); SVgroupInfo* pVgInfo = taosHashIterate(pVgHash, NULL);
while (pVgInfo) { while (pVgInfo) {
// make StreamProgressReq, send it // make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsma->streamUid, .streamId = pTsmaInfo->streamUid,
.subFetchIdx = subFetchIdx++, .subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId}; .vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++; pFetch->subFetchNum++;
hasSubFetch = true; hasSubFetch = true;
pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, pVgInfo); pVgInfo = taosHashIterate(pVgHash, pVgInfo);
} }
} }
} break; } break;
@ -2901,6 +2909,9 @@ _return:
tFreeTableTSMAInfo(pTsma); tFreeTableTSMAInfo(pTsma);
pTsma = NULL; pTsma = NULL;
} }
if (pVgHash) {
taosHashCleanup(pVgHash);
}
if (code) { if (code) {
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx);
pRes->code = code; pRes->code = code;

View File

@ -3330,7 +3330,13 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
CTG_RET(code); CTG_RET(code);
} }
void * pIter = taosHashIterate(pDbCache->tsmaCache, NULL); void *pIter = taosHashIterate(pDbCache->tsmaCache, NULL);
res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
if (!res.pRes) CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
STableTSMAInfoRsp* pRsp = res.pRes;
pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES);
if (!pRsp->pTsmas) CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
while (pIter && !found) { while (pIter && !found) {
SCtgTSMACache* pCtgCache = pIter; SCtgTSMACache* pCtgCache = pIter;
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
@ -3348,8 +3354,8 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
pIter = taosHashIterate(pDbCache->tsmaCache, pIter); pIter = taosHashIterate(pDbCache->tsmaCache, pIter);
} }
taosHashCancelIterate(pDbCache->tsmaCache, pIter); taosHashCancelIterate(pDbCache->tsmaCache, pIter);
if (found) { if (found && code == TSDB_CODE_SUCCESS) {
res.pRes = pTsmaOut; taosArrayPush(pRsp->pTsmas, &pTsmaOut);
taosArrayPush(pCtx->pResList, &res); taosArrayPush(pCtx->pResList, &res);
} }
@ -3510,6 +3516,10 @@ int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i); STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i);
if (pInfo->tsmaId == pTsmaCache->tsmaId) { if (pInfo->tsmaId == pTsmaCache->tsmaId) {
ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64
"rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64,
pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs,
pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration);
cacheSize = ctgGetTbTSMACacheSize(pInfo); cacheSize = ctgGetTbTSMACacheSize(pInfo);
taosArrayRemove(pCache->pTsmas, i); taosArrayRemove(pCache->pTsmas, i);
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
@ -3564,8 +3574,7 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i); pCache = taosArrayGetP(pCtgCache->pTsmas, i);
cacheSize += ctgGetTbTSMACacheSize(pCache); cacheSize += ctgGetTbTSMACacheSize(pCache);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
ctgTSMAVersionSearchCompare));
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
} }
taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo); taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo);
@ -3586,8 +3595,7 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
continue; continue;
} }
cacheSize = ctgGetTbTSMACacheSize(pCache); cacheSize = ctgGetTbTSMACacheSize(pCache);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
ctgTSMAVersionSearchCompare));
taosArrayRemove(pCtgCache->pTsmas, i); taosArrayRemove(pCtgCache->pTsmas, i);
tFreeAndClearTableTSMAInfo(pCache); tFreeAndClearTableTSMAInfo(pCache);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);

View File

@ -636,6 +636,12 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
} }
break; break;
} }
case TDMT_VND_GET_STREAM_PROGRESS: {
if (pCtx->out) {
taosMemoryFreeClear(pCtx->out);
}
break;
}
default: default:
qError("invalid reqType %d", pCtx->reqType); qError("invalid reqType %d", pCtx->reqType);
break; break;
@ -2406,7 +2412,19 @@ bool hasOutOfDateTSMACache(SArray* pTsmas) {
bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) { bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
return !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs); bool ret = !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs);
if (ret) {
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
" passed: %" PRId64,
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
} else {
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
" passed: %" PRId64,
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
}
return ret;
} }
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx,

View File

@ -629,5 +629,8 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
if (!pFunc->pStateFunc) { if (!pFunc->pStateFunc) {
return false; return false;
} }
return strcmp(pFunc->pStateFunc, pStateFunc->name) == 0; if (strcmp(pFunc->pStateFunc, pStateFunc->name) == 0) return true;
int32_t stateMergeFuncId = fmGetFuncId(pFunc->pStateFunc);
const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId];
return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
} }

View File

@ -10597,24 +10597,39 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
return code; return code;
} }
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq) { static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, SName* useTbName) {
SName name; SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name); tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
memset(&name, 0, sizeof(SName)); memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb); toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName);
tNameExtractFullName(useTbName, pReq->stb);
pReq->igExists = pStmt->ignoreExists; pReq->igExists = pStmt->ignoreExists;
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i; pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit; pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
#define TSMA_MIN_INTERVAL_MS 1 // 1ms
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
return TSDB_CODE_TSMA_INVALID_INTERVAL;
}
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
//TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取. // TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取.
STableTSMAInfo *pRecursiveTsma = NULL; STableTSMAInfo* pRecursiveTsma = NULL;
int32_t numOfCols = 0, numOfTags = 0; int32_t numOfCols = 0, numOfTags = 0;
SSchema* pCols = NULL, *pTags = NULL; SSchema * pCols = NULL, *pTags = NULL;
if (pStmt->pOptions->recursiveTsma) { if (pStmt->pOptions->recursiveTsma) {
code = getTsma(pCxt, &name, &pRecursiveTsma); // useTbName is base tsma name
code = getTsma(pCxt, useTbName, &pRecursiveTsma);
if (code == TSDB_CODE_SUCCESS) {
SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval;
if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) {
} else {
code = TSDB_CODE_TSMA_INVALID_PARA;
}
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
SNode* pNode; SNode* pNode;
if (TSDB_CODE_SUCCESS != nodesStringToNode(pRecursiveTsma->ast, &pNode)) { if (TSDB_CODE_SUCCESS != nodesStringToNode(pRecursiveTsma->ast, &pNode)) {
@ -10627,13 +10642,13 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
nodesListMakeStrictAppend(&pStmt->pOptions->pFuncs, nodesCloneNode(pNode)); nodesListMakeStrictAppend(&pStmt->pOptions->pFuncs, nodesCloneNode(pNode));
} }
nodesDestroyNode((SNode*)pSelect); nodesDestroyNode((SNode*)pSelect);
memset(useTbName, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array
numOfTags = pRecursiveTsma->pTags->size;
pCols = pRecursiveTsma->pUsedCols->pData;
pTags = pRecursiveTsma->pTags->pData;
} }
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->targetTb, &name), pReq->stb);
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array
numOfTags = pRecursiveTsma->pTags->size;
pCols = pRecursiveTsma->pUsedCols->pData;
pTags = pRecursiveTsma->pTags->pData;
} else { } else {
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -10643,6 +10658,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
pTags = pTableMeta->schema + numOfCols; pTags = pTableMeta->schema + numOfCols;
if (pTableMeta->tableType == TSDB_NORMAL_TABLE) { if (pTableMeta->tableType == TSDB_NORMAL_TABLE) {
pReq->normSourceTbUid = pTableMeta->uid; pReq->normSourceTbUid = pTableMeta->uid;
} else if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
code = TSDB_CODE_TSMA_INVALID_TB;
} }
} }
} }
@ -10682,13 +10699,12 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval); int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
SMCreateSmaReq smaReq = {0}; SMCreateSmaReq smaReq = {0};
SName useTbName = {0};
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = buildCreateTSMAReq(pCxt, pStmt, &smaReq); code = buildCreateTSMAReq(pCxt, pStmt, &smaReq, &useTbName);
} }
if ( TSDB_CODE_SUCCESS == code) { if ( TSDB_CODE_SUCCESS == code) {
SName name; code = collectUseTable(&useTbName, pCxt->pTargetTables);
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = collectUseTable(&name, pCxt->pTargetTables);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
// TODO replace with tsma serialization func // TODO replace with tsma serialization func
@ -10897,7 +10913,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_SHOW_CREATE_TSMA_STMT: case QUERY_NODE_SHOW_CREATE_TSMA_STMT:
break; break;
case QUERY_NODE_DROP_TSMA_STMT: case QUERY_NODE_DROP_TSMA_STMT:
code =translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode); code = translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode);
break; break;
default: default:
break; break;

View File

@ -5886,18 +5886,7 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn
static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs, SArray* pTsmaScanCols) { static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs, SArray* pTsmaScanCols) {
SNode* pNode; SNode* pNode;
int32_t tsmaColNum = 1;
bool failed = false, found = false; bool failed = false, found = false;
int32_t firstFuncId = ((STableTSMAFuncInfo*)taosArrayGet(pTsmaFuncs, 0))->funcId;
// find col num
for (int32_t i = 1; i < pTsmaFuncs->size; ++i) {
STableTSMAFuncInfo* pTsmaFunc = taosArrayGet(pTsmaFuncs, i);
if (firstFuncId == pTsmaFunc->funcId) {
tsmaColNum++;
} else {
break;
}
}
taosArrayClear(pTsmaScanCols); taosArrayClear(pTsmaScanCols);
FOREACH(pNode, pQueryFuncs) { FOREACH(pNode, pQueryFuncs) {
@ -5911,30 +5900,23 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
} }
int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId; int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId;
found = false; found = false;
int32_t notMyStateFuncId = 0;
// iterate funcs // iterate funcs
// TODO if func is count, skip checking cols // TODO if func is count, skip checking cols, test count(*)
for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) { for (int32_t i = 0; i < pTsmaFuncs->size; i++) {
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i); STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
if (pTsmaFuncInfo->funcId == notMyStateFuncId) continue;
if (!fmIsMyStateFunc(pQueryFunc->funcId, pTsmaFuncInfo->funcId)) { if (!fmIsMyStateFunc(pQueryFunc->funcId, pTsmaFuncInfo->funcId)) {
notMyStateFuncId = pTsmaFuncInfo->funcId;
continue; continue;
} }
// iterate cols within a func if (queryColId != pTsmaFuncInfo->colId) {
for (int32_t j = i; j < tsmaColNum + i; ++j) { continue;
if (j > i) {
pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j);
}
if (queryColId < pTsmaFuncInfo->colId) {
failed = true;
break;
}
if (queryColId > pTsmaFuncInfo->colId) {
continue;
}
found = true;
taosArrayPush(pTsmaScanCols, &j);
break;
} }
found = true;
taosArrayPush(pTsmaScanCols, &i);
break; break;
} }
if (failed || !found) { if (failed || !found) {
@ -5955,6 +5937,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
} }
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) continue;
// filter with interval // filter with interval
// TODO unit not right // TODO unit not right
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
@ -6576,6 +6559,7 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
} }
} }
clearTSMAOptCtx(&tsmaOptCtx); clearTSMAOptCtx(&tsmaOptCtx);
// TODO if any error occured, we should eat the error, skip the optimization, query with original table
return code; return code;
} }

View File

@ -686,6 +686,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_ENV, "Invalid tsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointer") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointer")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_TB, "Invalid table to create tsma, only stable or normal table allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_INTERVAL, "Invalid tsma interval, 1ms ~ 1h is allowed")
//rsma //rsma
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")

View File

@ -1,5 +1,6 @@
from os import name from os import name
from random import randrange from random import randrange
from socket import TIPC_ADDR_NAMESEQ
import taos import taos
import time import time
import threading import threading
@ -190,7 +191,7 @@ class TSMATester:
tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res))) tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res)))
for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res): for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res):
if row_no_tsma != row_tsma: if row_no_tsma != row_tsma:
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma))) tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % (sql, str(row_no_tsma), str(row_tsma), str(no_tsma_res), str(tsma_res)))
tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res))) tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res)))
def check_sql(self, sql: str, expect: TSMAQueryContext): def check_sql(self, sql: str, expect: TSMAQueryContext):
@ -210,7 +211,7 @@ class TSMATestSQLGenerator:
def __init__(self, opts: TSMATesterSQLGeneratorOptions): def __init__(self, opts: TSMATesterSQLGeneratorOptions):
self.db_name_: str = '' self.db_name_: str = ''
self.tb_name_: str = '' self.tb_name_: str = ''
self.ts_scan_range_: List[float] = [UsedTsma.TS_MIN, UsedTsma.TS_MAX] self.ts_scan_range_: List[float] = [float(UsedTsma.TS_MIN), float(UsedTsma.TS_MAX)]
self.agg_funcs_: List[str] = [] self.agg_funcs_: List[str] = []
self.tsmas_: List[TSMA] = [] ## currently created tsmas self.tsmas_: List[TSMA] = [] ## currently created tsmas
self.opts_: TSMATesterSQLGeneratorOptions = opts self.opts_: TSMATesterSQLGeneratorOptions = opts
@ -407,17 +408,32 @@ class TDTestCase:
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep']) self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str):
while True:
sql = 'select %s from %s.%s interval(%s)' % (', '.join(func_list), db, tb, interval)
tdLog.debug('waiting for tsma %s to be useful with sql %s' % (tsma_name, sql))
ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql)
if ctx.has_tsma():
if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX:
break
else:
time.sleep(1)
else:
time.sleep(1)
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str): def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str):
tdSql.execute('use %s' % db) tdSql.execute('use %s' % db)
sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (tsma_name, db, tb, ','.join(func_list), interval) sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (tsma_name, db, tb, ','.join(func_list), interval)
tdSql.execute(sql, queryTimes=1) tdSql.execute(sql, queryTimes=1)
self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name)
def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str):
def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str):
tdSql.execute('use %s' % db, queryTimes=1) tdSql.execute('use %s' % db, queryTimes=1)
sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval) sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval)
tdSql.execute(sql, queryTimes=1) tdSql.execute(sql, queryTimes=1)
self.wait_for_tsma_calculation(['avg(c1)'], db, tb_name, interval, new_tsma_name)
def drop_tsma(self, tsma_name: str, db: str): def drop_tsma(self, tsma_name: str, db: str):
sql = 'DROP TSMA %s.%s' % (db, tsma_name) sql = 'DROP TSMA %s.%s' % (db, tsma_name)
@ -438,17 +454,20 @@ class TDTestCase:
self.tsma_tester.check_sql(ctx.sql, ctx) self.tsma_tester.check_sql(ctx.sql, ctx)
def test_query_with_tsma(self): def test_query_with_tsma(self):
self.init_data()
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m') #self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m', 'meters')
self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h') #self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h', 'meters')
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
## why need 5s, calculation not finished yet. ## why need 10s, filling history not finished yet
#ctx = TSMAQCBuilder().with_sql('select avg(c1) from meters').should_query_with_table('meters', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
#self.tsma_tester.check_sql(ctx.sql, ctx)
time.sleep(5) time.sleep(5)
#time.sleep(9999999) #time.sleep(9999999)
self.test_query_with_tsma_interval() self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg() self.test_query_with_tsma_agg()
## self.test_query_with_drop_tsma()
## self.test_query_with_add_tag()
def test_query_with_tsma_interval(self): def test_query_with_tsma_interval(self):
self.check(self.test_query_with_tsma_interval_no_partition) self.check(self.test_query_with_tsma_interval_no_partition)
@ -550,8 +569,77 @@ class TDTestCase:
return [] return []
def run(self): def run(self):
self.init_data()
#time.sleep(999999)
self.test_create_tsma()
#self.test_drop_tsma()
self.test_tb_ddl_with_created_tsma()
self.test_query_with_tsma() self.test_query_with_tsma()
#time.sleep(999999) #time.sleep(999999)
def test_create_tsma(self):
self.test_create_tsma_on_stable()
self.test_create_tsma_on_norm_table()
self.test_create_tsma_on_child_table()
self.test_create_recursive_tsma()
## self.test_drop_stable()
## self.test_drop_ctable()
## self.test_drop_db()
def test_tb_ddl_with_created_tsma(self):
tdSql.execute('create database nsdb precision "ns"', queryTimes=1)
tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
## drop column, drop tag
tdSql.error('alter table meters drop column c1', -2147482637)
tdSql.error('alter table meters drop tag t1', -2147482637)
tdSql.error('alter table meters drop tag t2', -2147482637) # Stream must be dropped first
tdSql.execute('drop tsma tsma1', queryTimes=1)
## add tag
tdSql.execute('alter table meters add tag t3 int', queryTimes=1)
tdSql.execute('alter table meters drop tag t3', queryTimes=1)
tdSql.execute('drop database nsdb')
## test_drop stream
def test_create_tsma_on_stable(self):
tdSql.execute('create database nsdb precision "ns"', queryTimes=1)
tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) ## Invalid tsma interval, 1ms ~ 1h is allowed
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3601s)', -2147471097)
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3600001a)', -2147471097)
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3600001000u)', -2147471097)
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097)
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999u)', -2147471097)
tdSql.execute('drop tsma tsma1')
tdSql.error('create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self):
pass
def test_create_tsma_on_child_table(self):
tdSql.error('create tsma tsma1 on test.t1 function(avg(c1), avg(c2)) interval(1m)', -2147471098) ## Invalid table to create tsma, only stable or normal table allowed
def test_create_recursive_tsma(self):
tdSql.execute('use test')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
sql = 'create recursive tsma tsma2 on tsma1 interval(1m)'
tdSql.error(sql, -2147471099) ## invalid tsma parameter
sql = 'create recursive tsma tsma2 on tsma1 interval(7m)'
tdSql.error(sql, -2147471099) ## invalid tsma parameter
sql = 'create recursive tsma tsma2 on tsma1 interval(11m)'
tdSql.error(sql, -2147471099) ## invalid tsma parameter
self.create_recursive_tsma('tsma1', 'tsma2', 'test', '20m', 'meters')
tdSql.execute('drop tsma tsma2', queryTimes=1)
tdSql.execute('drop tsma tsma1', queryTimes=1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()