add convert for rSma
This commit is contained in:
parent
43b5bf8333
commit
a98bf9d1b5
|
@ -30,6 +30,7 @@ typedef struct SPlanContext {
|
|||
SNode* pAstRoot;
|
||||
bool topicQuery;
|
||||
bool streamQuery;
|
||||
bool rSmaQuery;
|
||||
bool showRewrite;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
|
|
|
@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
|
||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
||||
|
||||
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -34,6 +34,54 @@
|
|||
|
||||
extern bool tsStreamSchedV;
|
||||
|
||||
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
|
||||
SNode* pAst = NULL;
|
||||
SQueryPlan* pPlan = NULL;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.rSmaQuery = true,
|
||||
.triggerType = triggerType,
|
||||
.watermark = watermark,
|
||||
};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
|
||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||
if (levelNum != 1) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
|
||||
|
||||
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
||||
if (opNum != 1) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
|
||||
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
||||
if (qSubPlanToString(plan, pStr, pLen) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
|
||||
END:
|
||||
if (pAst) nodesDestroyNode(pAst);
|
||||
if (pPlan) nodesDestroyNode(pPlan);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
|
||||
SCoder encoder;
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
|
|
|
@ -141,7 +141,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, int16_t* pG
|
|||
colInfo.info.colId = pColSchema->colId;
|
||||
colInfo.info.type = pColSchema->type;
|
||||
|
||||
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
|
||||
if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
taosArrayPush(*ppCols, &colInfo);
|
||||
|
|
Loading…
Reference in New Issue