Merge pull request #21915 from wangjiaming0909/feature/3.0/TD-24992
feature: get last timestamp before create sma index
This commit is contained in:
commit
203ae21a84
|
@ -3025,6 +3025,7 @@ typedef struct {
|
|||
char* sql;
|
||||
char* ast;
|
||||
int64_t deleteMark;
|
||||
int64_t lastTs;
|
||||
} SMCreateSmaReq;
|
||||
|
||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||
|
|
|
@ -319,6 +319,7 @@ typedef struct SIndexOptions {
|
|||
SNode* pInterval;
|
||||
SNode* pOffset;
|
||||
SNode* pSliding;
|
||||
int8_t tsPrecision;
|
||||
SNode* pStreamOptions;
|
||||
} SIndexOptions;
|
||||
|
||||
|
@ -332,6 +333,8 @@ typedef struct SCreateIndexStmt {
|
|||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
SNodeList* pCols;
|
||||
SIndexOptions* pOptions;
|
||||
SNode* pPrevQuery;
|
||||
SMCreateSmaReq* pReq;
|
||||
} SCreateIndexStmt;
|
||||
|
||||
typedef struct SDropIndexStmt {
|
||||
|
|
|
@ -835,6 +835,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
|
|||
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -884,6 +885,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
|||
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -907,6 +907,10 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
|
||||
nodesDestroyNode((SNode*)pStmt->pOptions);
|
||||
nodesDestroyList(pStmt->pCols);
|
||||
if (pStmt->pReq) {
|
||||
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_DROP_INDEX_STMT: // no pointer field
|
||||
|
@ -1053,6 +1057,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
}
|
||||
case QUERY_NODE_QUERY: {
|
||||
SQuery* pQuery = (SQuery*)pNode;
|
||||
nodesDestroyNode(pQuery->pPrevRoot);
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
nodesDestroyNode(pQuery->pPostRoot);
|
||||
taosMemoryFreeClear(pQuery->pResSchema);
|
||||
|
|
|
@ -35,6 +35,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe
|
|||
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
|
||||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
||||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -5803,6 +5803,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STableMeta* pMetaCache = NULL;
|
||||
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision;
|
||||
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery);
|
||||
}
|
||||
taosMemoryFreeClear(pMetaCache);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -5828,15 +5837,60 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS
|
|||
}
|
||||
|
||||
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
|
||||
SMCreateSmaReq createSmaReq = {0};
|
||||
int32_t code = checkCreateSmaIndex(pCxt, pStmt);
|
||||
pStmt->pReq = taosMemoryCalloc(1, sizeof(SMCreateSmaReq));
|
||||
if (pStmt->pReq == NULL) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq);
|
||||
code = buildCreateSmaReq(pCxt, pStmt, pStmt->pReq);
|
||||
}
|
||||
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval* pInterval) {
|
||||
pInterval->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||
pInterval->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
|
||||
pInterval->offset = NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0;
|
||||
pInterval->sliding = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pInterval->interval;
|
||||
pInterval->slidingUnit = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pInterval->intervalUnit;
|
||||
pInterval->precision = pStmt->pOptions->tsPrecision;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void ** pResRow) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||
int64_t lastTs = 0;
|
||||
SInterval interval = {0};
|
||||
STranslateContext pCxt = {0};
|
||||
code = initTranslateContext(pParseCxt, NULL, &pCxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &createSmaReq);
|
||||
if (pResRow && pResRow[0]) {
|
||||
lastTs = *(int64_t*)pResRow[0];
|
||||
} else if (interval.interval > 0) {
|
||||
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
|
||||
} else {
|
||||
lastTs = taosGetTimestampMs();
|
||||
}
|
||||
tFreeSMCreateSmaReq(&createSmaReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (interval.interval > 0) {
|
||||
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval);
|
||||
} else {
|
||||
pStmt->pReq->lastTs = lastTs;
|
||||
}
|
||||
code = buildCmdMsg(&pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQuery(&pCxt, pQuery);
|
||||
}
|
||||
setRefreshMate(&pCxt, pQuery);
|
||||
destroyTranslateContext(&pCxt);
|
||||
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -6989,7 +7043,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
|
||||
static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
||||
return code;
|
||||
|
|
|
@ -227,6 +227,8 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pRes
|
|||
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||
code = translatePostCreateStream(pCxt, pQuery, pResRow);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_INDEX_STMT:
|
||||
code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -542,6 +542,18 @@ TEST_F(ParserInitialCTest, createSmaIndex) {
|
|||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
|
||||
SMCreateSmaReq req = {0};
|
||||
ASSERT_TRUE(pQuery->pPrevRoot);
|
||||
ASSERT_EQ(QUERY_NODE_SELECT_STMT, nodeType(pQuery->pPrevRoot));
|
||||
|
||||
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL == pCmdMsg) FAIL();
|
||||
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
|
||||
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
|
||||
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
|
||||
if (!pCmdMsg->pMsg) FAIL();
|
||||
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
|
||||
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
|
||||
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||
|
||||
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||
|
|
|
@ -441,6 +441,16 @@ class PlannerTestBaseImpl {
|
|||
pCxt->topicQuery = true;
|
||||
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
|
||||
SMCreateSmaReq req = {0};
|
||||
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL == pCmdMsg) FAIL();
|
||||
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
|
||||
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
|
||||
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
|
||||
if (!pCmdMsg->pMsg) FAIL();
|
||||
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
|
||||
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
|
||||
|
||||
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
||||
g_mockCatalogService->createSmaIndex(&req);
|
||||
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
||||
|
|
Loading…
Reference in New Issue