fetch lastts vnode version before create tsma
This commit is contained in:
parent
8e56d9a359
commit
d8200b2f63
|
@ -3569,6 +3569,7 @@ typedef struct {
|
|||
int64_t deleteMark;
|
||||
int64_t lastTs;
|
||||
int64_t normSourceTbUid; // the Uid of source tb if its a normal table, otherwise 0
|
||||
SArray* pVgroupVerList;
|
||||
} SMCreateSmaReq;
|
||||
|
||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||
|
|
|
@ -605,8 +605,10 @@ typedef struct SCreateTSMAStmt {
|
|||
char tsmaName[TSDB_INDEX_NAME_LEN];
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN]; // base tb name or base tsma name
|
||||
char originalTbName[TSDB_TABLE_NAME_LEN];
|
||||
STSMAOptions* pOptions;
|
||||
//SMCreateSmaReq* pReq;
|
||||
SNode* pPrevQuery;
|
||||
SMCreateSmaReq* pReq;
|
||||
} SCreateTSMAStmt;
|
||||
|
||||
typedef struct SShowCreateTSMAStmt {
|
||||
|
|
|
@ -888,6 +888,13 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
|
|||
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->normSourceTbUid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1;
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
|
||||
SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i);
|
||||
if (tEncodeI32(&encoder, p->vgId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, p->ver) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -939,6 +946,27 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
|||
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->normSourceTbUid) < 0) return -1;
|
||||
|
||||
int32_t numOfVgVer;
|
||||
// TODO extract method decode vgVerList
|
||||
if (tDecodeI32(&decoder, &numOfVgVer) < 0) return -1;
|
||||
if (numOfVgVer > 0) {
|
||||
pReq->pVgroupVerList = taosArrayInit(numOfVgVer, sizeof(SVgroupVer));
|
||||
if (pReq->pVgroupVerList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfVgVer; ++i) {
|
||||
SVgroupVer v = {0};
|
||||
if (tDecodeI32(&decoder, &v.vgId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &v.ver) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pVgroupVerList, &v) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
@ -949,6 +977,7 @@ void tFreeSMCreateSmaReq(SMCreateSmaReq *pReq) {
|
|||
taosMemoryFreeClear(pReq->tagsFilter);
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
taosMemoryFreeClear(pReq->ast);
|
||||
taosArrayDestroy(pReq->pVgroupVerList);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) {
|
||||
|
|
|
@ -67,9 +67,8 @@ typedef struct SCreateTSMACxt {
|
|||
const SMCreateSmaReq *pCreateSmaReq;
|
||||
const SMDropSmaReq * pDropSmaReq;
|
||||
};
|
||||
const SDbObj *pDb;
|
||||
SStbObj * pSrcStb;
|
||||
// TODO normal table
|
||||
const SDbObj * pDb;
|
||||
SStbObj * pSrcStb;
|
||||
SSmaObj * pSma;
|
||||
SCMCreateStreamReq *pCreateStreamReq;
|
||||
SMDropStreamReq * pDropStreamReq;
|
||||
|
@ -1448,8 +1447,9 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
|||
pCxt->pCreateStreamReq->targetStbUid = 0;
|
||||
pCxt->pCreateStreamReq->fillNullCols = NULL;
|
||||
pCxt->pCreateStreamReq->igUpdate = 0;
|
||||
// TODO what's this tiemstamp?
|
||||
pCxt->pCreateStreamReq->lastTs = 1755442278000;
|
||||
// TODO what's this timestamp
|
||||
//pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
|
||||
pCxt->pCreateStreamReq->lastTs = 1758414148000;
|
||||
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
|
||||
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
|
||||
|
||||
|
@ -1556,6 +1556,14 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
|
|||
pCxt->pSma = &sma;
|
||||
initSMAObj(pCxt);
|
||||
pCxt->pCreateStreamReq = &createStreamReq;
|
||||
if (pCxt->pCreateSmaReq->pVgroupVerList) {
|
||||
pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
|
||||
if (!pCxt->pCreateStreamReq->pVgroupVerList) {
|
||||
errno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
pCxt->pDropStreamReq = &dropStreamReq;
|
||||
mndCreateTSMABuildCreateStreamReq(pCxt);
|
||||
mndCreateTSMABuildDropStreamReq(pCxt);
|
||||
|
@ -1571,7 +1579,7 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
|
|||
|
||||
_OVER:
|
||||
tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
|
||||
tFreeMDropStreamReq(pCxt->pDropStreamReq);
|
||||
if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
|
||||
pCxt->pCreateStreamReq = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -1719,7 +1727,7 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
|||
SMDropStbReq dropStbReq = {0};
|
||||
dropStbReq.igNotExists = false;
|
||||
tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
// TODO fill sql
|
||||
// TODO fill sql, sql may be freed
|
||||
dropStbReq.sql = "drop";
|
||||
dropStbReq.sqlLen = 5;
|
||||
|
||||
|
|
|
@ -562,6 +562,9 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
|||
latest = pReader->pHead->head.ingestTs;
|
||||
}
|
||||
}
|
||||
// TODO remove log
|
||||
tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64, ver, *fhFinished,
|
||||
verRange.maxVer, cur, latest);
|
||||
|
||||
if (pDelay != NULL) { // delay in ms
|
||||
*pDelay = (latest - cur) / 1000;
|
||||
|
|
|
@ -1884,6 +1884,7 @@ int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaNa
|
|||
ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1);
|
||||
*pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0);
|
||||
taosArrayDestroy(tsmaRsp.pTsmas);
|
||||
tsmaRsp.pTsmas = NULL;
|
||||
|
||||
_return:
|
||||
if (tsmaRsp.pTsmas) {
|
||||
|
|
|
@ -2857,7 +2857,8 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
pTsmaInfo->rspTs = taosGetTimestampMs();
|
||||
pTsmaInfo->delayDuration = TMAX(pRsp->progressDelay, pTsmaInfo->delayDuration);
|
||||
pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished;
|
||||
qDebug("received stream progress for tsma %s rsp history: %d vnode: %d", pTsmaInfo->name, pRsp->fillHisFinished, pRsp->subFetchIdx);
|
||||
qDebug("received stream progress for tsma %s rsp history: %d vnode: %d, delay: %" PRId64, pTsmaInfo->name,
|
||||
pRsp->fillHisFinished, pRsp->subFetchIdx, pRsp->progressDelay);
|
||||
|
||||
if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) {
|
||||
// subfetch all finished
|
||||
|
|
|
@ -3015,18 +3015,17 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq
|
|||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));
|
||||
|
||||
if (NULL == tblMeta) {
|
||||
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL != tblMeta) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
|
||||
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
|
||||
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
|
||||
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
|
||||
}
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
|
||||
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
||||
}
|
||||
|
||||
// TODO TEST normal table
|
||||
|
@ -3331,11 +3330,6 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
|
|||
}
|
||||
|
||||
void *pIter = taosHashIterate(pDbCache->tsmaCache, NULL);
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (!res.pRes) CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
STableTSMAInfoRsp* pRsp = res.pRes;
|
||||
pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES);
|
||||
if (!pRsp->pTsmas) CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
while (pIter && !found) {
|
||||
SCtgTSMACache* pCtgCache = pIter;
|
||||
|
@ -3355,6 +3349,18 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
|
|||
}
|
||||
taosHashCancelIterate(pDbCache->tsmaCache, pIter);
|
||||
if (found && code == TSDB_CODE_SUCCESS) {
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (!res.pRes) {
|
||||
tFreeAndClearTableTSMAInfo(pTsmaOut);
|
||||
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
STableTSMAInfoRsp* pRsp = res.pRes;
|
||||
pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES);
|
||||
if (!pRsp->pTsmas) {
|
||||
tFreeAndClearTableTSMAInfo(pTsmaOut);
|
||||
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
taosArrayPush(pRsp->pTsmas, &pTsmaOut);
|
||||
taosArrayPush(pCtx->pResList, &res);
|
||||
}
|
||||
|
@ -3445,6 +3451,7 @@ static SCtgCacheOperation* createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const S
|
|||
}
|
||||
|
||||
int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
||||
ctgDebug("drop tsma meta for tb: %s.%s", pName->dbname, pName->tname);
|
||||
int32_t code = 0;
|
||||
SCtgDBCache* pDbCache = NULL;
|
||||
SCtgCacheOperation* pOp = NULL;
|
||||
|
@ -3570,7 +3577,7 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
|
|||
uint64_t cacheSize = 0;
|
||||
STSMACache *pCache = NULL;
|
||||
if (msg->dropAllForTb) {
|
||||
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
|
||||
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
||||
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
|
||||
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
|
||||
cacheSize += ctgGetTbTSMACacheSize(pCache);
|
||||
|
@ -3579,8 +3586,9 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
|
|||
}
|
||||
taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo);
|
||||
pCtgCache->pTsmas = NULL;
|
||||
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
|
||||
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
|
||||
taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
|
||||
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
||||
} else {
|
||||
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
||||
pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
|
||||
|
|
|
@ -266,7 +266,7 @@ void ctgRemoveTSMARent(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
|||
while (pIter) {
|
||||
SCtgTSMACache* pCtgCache = pIter;
|
||||
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
|
||||
int32_t size = pCtgCache ? pCtgCache->pTsmas->size : 0;
|
||||
int32_t size = (pCtgCache && pCtgCache->pTsmas) ? pCtgCache->pTsmas->size : 0;
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i);
|
||||
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSortCompare, ctgTSMAVersionSearchCompare)) {
|
||||
|
|
|
@ -1288,6 +1288,10 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_CREATE_TSMA_STMT: {
|
||||
SCreateTSMAStmt* pStmt = (SCreateTSMAStmt*)pNode;
|
||||
nodesDestroyNode((SNode*)pStmt->pOptions);
|
||||
if (pStmt->pReq) {
|
||||
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN: {
|
||||
|
|
|
@ -37,6 +37,7 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
|
|||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
||||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
|
||||
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
|
||||
int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
|
||||
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues);
|
||||
int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent);
|
||||
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput);
|
||||
|
|
|
@ -2865,6 +2865,7 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken*
|
|||
SRealTableNode* pTable = (SRealTableNode*)pRealTable;
|
||||
memcpy(pStmt->dbName, pTable->table.dbName, TSDB_DB_NAME_LEN);
|
||||
memcpy(pStmt->tableName, pTable->table.tableName, TSDB_TABLE_NAME_LEN);
|
||||
memcpy(pStmt->originalTbName, pTable->table.tableName, TSDB_TABLE_NAME_LEN);
|
||||
nodesDestroyNode(pRealTable);
|
||||
|
||||
return (SNode*)pStmt;
|
||||
|
|
|
@ -793,6 +793,18 @@ static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreat
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromDropTSMAStmt(SCollectMetaKeyCxt* pCxt, SDropTSMAStmt* pStmt) {
|
||||
int32_t code;
|
||||
code = reserveTSMAInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowTSMASStmt(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TSMAS,
|
||||
pCxt->pMetaCache);
|
||||
|
@ -931,6 +943,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
|||
case QUERY_NODE_CREATE_TSMA_STMT:
|
||||
return collectMetaKeyFromCreateTSMAStmt(pCxt, (SCreateTSMAStmt*)pStmt);
|
||||
case QUERY_NODE_DROP_TSMA_STMT:
|
||||
return collectMetaKeyFromDropTSMAStmt(pCxt, (SDropTSMAStmt*)pStmt);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TSMAS_STMT:
|
||||
return collectMetaKeyFromShowTSMASStmt(pCxt, (SShowStmt*)pStmt);
|
||||
|
|
|
@ -10643,6 +10643,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
|||
}
|
||||
nodesDestroyNode((SNode*)pSelect);
|
||||
memset(useTbName, 0, sizeof(SName));
|
||||
memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN);
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
|
||||
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array
|
||||
numOfTags = pRecursiveTsma->pTags->size;
|
||||
|
@ -10678,17 +10679,12 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
|||
code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName,
|
||||
numOfTags, pTags);
|
||||
}
|
||||
/*
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STableMeta* pMetaCache = NULL;
|
||||
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pStmt->pOptions->recursiveTsma) { //TODO remvoe recursive tsma check
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision;
|
||||
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery);
|
||||
pStmt->pOptions->tsPrecision = pTableMeta->tableInfo.precision;
|
||||
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pTableMeta, &pStmt->pPrevQuery);
|
||||
}
|
||||
taosMemoryFreeClear(pMetaCache);
|
||||
}
|
||||
*/
|
||||
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
|
@ -10698,19 +10694,78 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
|||
static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pStmt) {
|
||||
int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
|
||||
|
||||
SMCreateSmaReq smaReq = {0};
|
||||
SName useTbName = {0};
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = buildCreateTSMAReq(pCxt, pStmt, &smaReq, &useTbName);
|
||||
pStmt->pReq = taosMemoryCalloc(1, sizeof(SMCreateSmaReq));
|
||||
if (!pStmt->pReq) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq, &useTbName);
|
||||
}
|
||||
if ( TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(&useTbName, pCxt->pTargetTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// TODO replace with tsma serialization func
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &smaReq);
|
||||
if (!pStmt->pPrevQuery) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
|
||||
} else {
|
||||
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
|
||||
}
|
||||
}
|
||||
tFreeSMCreateSmaReq(&smaReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildIntervalForCreateTSMA(SCreateTSMAStmt* pStmt, SInterval* pInterval) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pInterval->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||
pInterval->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
|
||||
pInterval->offset = 0;
|
||||
pInterval->sliding = pInterval->interval;
|
||||
pInterval->slidingUnit = pInterval->intervalUnit;
|
||||
pInterval->precision = pStmt->pOptions->tsPrecision;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock) {
|
||||
SCreateTSMAStmt* pStmt = (SCreateTSMAStmt*)pQuery->pRoot;
|
||||
STranslateContext cxt = {0};
|
||||
SInterval interval = {0};
|
||||
int64_t lastTs = 0;
|
||||
|
||||
int32_t code = initTranslateContext(pParseCxt, NULL, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildIntervalForCreateTSMA(pStmt, &interval);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createStreamReqVersionInfo(pBlock, &pStmt->pReq->pVgroupVerList, &lastTs, &interval);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (interval.interval > 0) {
|
||||
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision);
|
||||
} else {
|
||||
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
|
||||
}
|
||||
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQuery(&cxt, pQuery);
|
||||
}
|
||||
|
||||
if ( TSDB_CODE_SUCCESS == code) {
|
||||
SName name = {0};
|
||||
toName(pParseCxt->acctId, pStmt->dbName, pStmt->originalTbName, &name);
|
||||
code = collectUseTable(&name, cxt.pTargetTables);
|
||||
}
|
||||
|
||||
setRefreshMeta(&cxt, pQuery);
|
||||
destroyTranslateContext(&cxt);
|
||||
|
||||
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -10720,6 +10775,12 @@ static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt)
|
|||
SName name;
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), dropReq.name);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
STableTSMAInfo* pTsma = NULL;
|
||||
code = getTsma(pCxt, &name, &pTsma);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pTsma->tb, &name);
|
||||
code = collectUseTable(&name, pCxt->pTargetTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code)
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_DROP_TSMA, (FSerializeFunc)tSerializeSMDropSmaReq, &dropReq);
|
||||
return code;
|
||||
|
|
|
@ -244,6 +244,10 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock
|
|||
code = translatePostCreateSmaIndex(pCxt, pQuery, pBlock);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_CREATE_TSMA_STMT: {
|
||||
code = translatePostCreateTSMA(pCxt, pQuery, pBlock);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -299,6 +299,7 @@ class TSMATestSQLGenerator:
|
|||
## order by, limit, having, subquery...
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 143, 'asynclog': 0}
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
|
@ -462,12 +463,13 @@ class TDTestCase:
|
|||
## why need 10s, filling history not finished yet
|
||||
#ctx = TSMAQCBuilder().with_sql('select avg(c1) from meters').should_query_with_table('meters', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
||||
#self.tsma_tester.check_sql(ctx.sql, ctx)
|
||||
time.sleep(5)
|
||||
#time.sleep(5)
|
||||
#time.sleep(9999999)
|
||||
self.test_query_with_tsma_interval()
|
||||
self.test_query_with_tsma_agg()
|
||||
## self.test_query_with_drop_tsma()
|
||||
## self.test_query_with_add_tag()
|
||||
## self.test_union()
|
||||
|
||||
def test_query_with_tsma_interval(self):
|
||||
self.check(self.test_query_with_tsma_interval_no_partition)
|
||||
|
|
Loading…
Reference in New Issue