diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2b3a1f2650..937ac2b408 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -152,7 +152,6 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); typedef struct { char* qmsg; // followings are not applicable to encoder and decoder - // void* inputHandle; void* executor; } STaskExec; @@ -400,15 +399,13 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); -int32_t streamTaskRun(SStreamTask* pTask); - -int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg); -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp); -int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessRunReq(SStreamTask* pTask); +int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); +int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); #ifdef __cplusplus diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7457fe7eb6..92eda0c5e0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -161,7 +161,7 @@ int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; // stream scheduler -bool tsStreamSchedV = true; +bool tsSchedStreamToSnode = true; /* * minimum scale for whole system, millisecond by default diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 52a69f95b4..81576e153e 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,11 +96,11 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 8d93ddd66c..19c1b9b5c7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -58,6 +58,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) { ASSERT(0); } + smSendRsp(pMsg, 0); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); @@ -70,6 +71,7 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from snode-shared queue", pMsg); if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) { + smSendRsp(pMsg, terrno); ASSERT(0); } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 7e5379b0f8..6f00767eb0 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -35,7 +35,6 @@ typedef struct SVnodeMgmt { SWWorkerPool syncPool; SWWorkerPool writePool; SWWorkerPool applyPool; - SWWorkerPool mergePool; SSingleWorker mgmtWorker; SSingleWorker monitorWorker; SHashObj *hash; @@ -63,7 +62,6 @@ typedef struct { STaosQueue *pApplyQ; STaosQueue *pQueryQ; STaosQueue *pFetchQ; - STaosQueue *pMergeQ; } SVnodeObj; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 9e4e7713f2..3f053639aa 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 95dd5732c6..71bbc8ddd4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -98,7 +98,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; @@ -119,7 +119,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; @@ -251,10 +251,9 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); - pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || - pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) { + pVnode->pFetchQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -269,13 +268,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); pVnode->pWriteQ = NULL; pVnode->pSyncQ = NULL; pVnode->pApplyQ = NULL; pVnode->pQueryQ = NULL; pVnode->pFetchQ = NULL; - pVnode->pMergeQ = NULL; dDebug("vgId:%d, queue is freed", pVnode->vgId); } @@ -307,11 +304,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pSPool->max = tsNumOfVnodeSyncThreads; if (tWWorkerInit(pSPool) != 0) return -1; - SWWorkerPool *pMPool = &pMgmt->mergePool; - pMPool->name = "vnode-merge"; - pMPool->max = tsNumOfVnodeMergeThreads; - if (tWWorkerInit(pMPool) != 0) return -1; - SSingleWorkerCfg mgmtCfg = { .min = 1, .max = 1, @@ -342,6 +334,5 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->syncPool); tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->fetchPool); - tWWorkerCleanup(&pMgmt->mergePool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3ff0c39bc3..3bd9a9128d 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -33,7 +33,7 @@ #include "tname.h" #include "tuuid.h" -extern bool tsStreamSchedV; +extern bool tsSchedStreamToSnode; static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -204,9 +204,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS return 0; } -SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) { +SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { SSnodeObj* pObj = NULL; - pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj); + void* pIter = NULL; + // TODO random fetch + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj); return pObj; } @@ -214,7 +216,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, const SSnodeObj* pSnode) { int32_t msgLen; - pTask->nodeId = 0; + pTask->nodeId = SNODE_HANDLE; pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode); plan->execNode.nodeId = 0; @@ -224,7 +226,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, SNODE_HANDLE); return 0; } @@ -370,8 +372,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } if (totLevel > 1) { - SStreamTask* pFinalTask; - // inner plan + SStreamTask* pInnerTask; + // inner level { SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskInnerLevel); @@ -380,31 +382,51 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); - pFinalTask = tNewSStreamTask(pStream->uid); - mndAddTaskToTaskSet(taskInnerLevel, pFinalTask); + pInnerTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); // input - pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // trigger - pFinalTask->triggerParam = pStream->triggerParam; + pInnerTask->triggerParam = pStream->triggerParam; // dispatch - if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { + if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pInnerTask) < 0) { qDestroyQueryPlan(pPlan); return -1; } // exec - pFinalTask->execType = TASK_EXEC__PIPE; - SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; + pInnerTask->execType = TASK_EXEC__PIPE; + + if (tsSchedStreamToSnode) { + SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); + if (pSnode == NULL) { + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); + if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } else { + if (mndAssignTaskToSnode(pMnode, pTrans, pInnerTask, plan, pSnode) < 0) { + ASSERT(0); + sdbRelease(pSdb, pSnode); + qDestroyQueryPlan(pPlan); + return -1; + } + } + } else { + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); + if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } } } - // source plan + // source level SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskSourceLevel); @@ -434,9 +456,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->fixedEpDispatcher.taskId = pFinalTask->taskId; - pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId; - pTask->fixedEpDispatcher.epSet = pFinalTask->epSet; + pTask->fixedEpDispatcher.taskId = pInnerTask->taskId; + pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; + pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; // exec pTask->execType = TASK_EXEC__PIPE; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 8ef48ccbf9..84a66c680b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -78,8 +78,8 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { SStreamMeta *pMeta = pNode->pMeta; - char *msg = pMsg->pCont; - int32_t msgLen = pMsg->contLen; + char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { @@ -105,23 +105,22 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { ASSERT(pTask->execType != TASK_EXEC__NONE); - SReadHandle handle = { - .pMsgCb = &pNode->msgCb, - }; - - /*pTask->exec.inputHandle = NULL;*/ - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->dataScan == 0); + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); ASSERT(pTask->exec.executor); streamSetupTrigger(pTask); qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId); + taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *)); + return 0; FAIL: if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); + if (pTask) taosMemoryFree(pTask); return -1; } @@ -130,7 +129,7 @@ static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamTaskProcessRunReq(pTask, &pNode->msgCb); + streamProcessRunReq(pTask); return 0; } @@ -151,7 +150,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp); + streamProcessDispatchReq(pTask, &req, &rsp); return 0; } @@ -161,7 +160,7 @@ static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) { SStreamTaskRecoverReq *pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg); + streamProcessRecoverReq(pTask, pReq, pMsg); return 0; } @@ -171,7 +170,7 @@ static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp); + streamProcessDispatchRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 70b6e24b07..c0dfebb08f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); -int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids); +int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); @@ -106,28 +106,28 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); -int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList); +int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); -int tsdbClose(STsdb** pTsdb); -int32_t tsdbBegin(STsdb* pTsdb); -int32_t tsdbCommit(STsdb* pTsdb); -int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); -int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, - SSubmitBlkRsp* pRsp); -int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - uint64_t taskId); -tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - void* pMemRef); -int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); -int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); -int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); +int tsdbClose(STsdb** pTsdb); +int32_t tsdbBegin(STsdb* pTsdb); +int32_t tsdbCommit(STsdb* pTsdb); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); +int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, + SSubmitBlkRsp* pRsp); +int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + uint64_t taskId); +tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + void* pMemRef); +int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); +int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); +int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); // tq int tqInit(); @@ -141,7 +141,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); @@ -262,7 +262,7 @@ struct SSma { #define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) -#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions) +#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions) #define SMA_LOCKED(s) ((s)->locked) #define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_VID(s) TD_VID((s)->pVnode) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 06a119b076..ece4b7e2a4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -333,7 +333,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { SReadHandle handle = { .reader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -373,7 +372,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; @@ -404,7 +403,6 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { SReadHandle handle = { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, .vnode = pTq->pVnode, }; /*pTask->exec.inputHandle = pStreamReader;*/ @@ -468,7 +466,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { continue; } - if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) { + if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { continue; } } else { @@ -489,7 +487,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb); + streamProcessRunReq(pTask); return 0; } @@ -507,7 +505,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp); + streamProcessDispatchReq(pTask, &req, &rsp); return 0; } @@ -515,7 +513,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + streamProcessRecoverReq(pTask, pReq, pMsg); return 0; } @@ -523,7 +521,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); + streamProcessDispatchRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fb22b7c5bf..c097e2f929 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -167,8 +167,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } } break; @@ -304,18 +304,17 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } -static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp){ - +static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; - int32_t t = ntohl(*(int32_t*)pReq); + int32_t t = ntohl(*(int32_t *)pReq); vError("rec ttl time:%d", t); int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids); - if(ret != 0){ + if (ret != 0) { goto end; } - if(taosArrayGetSize(tbUids) > 0){ + if (taosArrayGetSize(tbUids) > 0) { tqUpdateTbUidList(pVnode->pTq, tbUids, false); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5c038ed709..3828a26bc4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4052,14 +4052,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { - .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, + }; tsdbReaderT pDataReader = NULL; if (pHandle) { if (pHandle->vnode) { + // for stram pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); } else { + // for tq getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond); } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6dcbfad957..38a1ad14b1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -64,7 +64,7 @@ void streamTriggerByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); streamTaskInput(pTask, (SStreamQueueItem*)trigger); - streamLaunchByWrite(pTask, pTask->nodeId, pTask->pMsgCb); + streamLaunchByWrite(pTask, pTask->nodeId); } taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); @@ -81,7 +81,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { return 0; } -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); @@ -96,7 +96,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq), }; - tmsgPutToQueue(pMsgCb, FETCH_QUEUE, &msg); + tmsgPutToQueue(pTask->pMsgCb, FETCH_QUEUE, &msg); } return 0; } @@ -136,7 +136,9 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { + qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId); + // 1. handle input streamTaskEnqueue(pTask, pReq, pRsp); @@ -145,7 +147,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp // 2.2. executing: return // 2.3. closing: keep trying if (pTask->execType != TASK_EXEC__NONE) { - streamExec(pTask, pMsgCb); + streamExec(pTask, pTask->pMsgCb); } else { ASSERT(pTask->sinkType != TASK_SINK__NONE); while (1) { @@ -161,34 +163,38 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp // 3.1 check and set status // 3.2 dispatch / sink if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatch(pTask, pMsgCb); + streamDispatch(pTask, pTask->pMsgCb); } return 0; } -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); + + qInfo("task %d receive dispatch rsp", pTask->taskId); + int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer + ASSERT(0); return 0; } // continue dispatch - streamDispatch(pTask, pMsgCb); + streamDispatch(pTask, pTask->pMsgCb); return 0; } -int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { - streamExec(pTask, pMsgCb); +int32_t streamProcessRunReq(SStreamTask* pTask) { + streamExec(pTask, pTask->pMsgCb); if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatch(pTask, pMsgCb); + streamDispatch(pTask, pTask->pMsgCb); } return 0; } -int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { +int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { // return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ca10e7d956..1894f697c0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -144,7 +144,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM } } - ASSERT(vgId != 0); + ASSERT(vgId > 0 || vgId == SNODE_HANDLE); req.taskId = downstreamTaskId; qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, @@ -199,6 +199,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { } ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); + qInfo("stream continue dispatching: task %d", pTask->taskId); + SRpcMsg dispatchMsg = {0}; SEpSet* pEpSet = NULL; if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 04428136ae..5a71fccab8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -107,18 +107,19 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; - break; + taosArrayDestroy(pRes); + atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + return 0; } else if (execStatus == TASK_STATUS__CLOSING) { continue; } else if (execStatus == TASK_STATUS__EXECUTING) { - break; + ASSERT(taosArrayGetSize(pRes) == 0); + taosArrayDestroy(pRes); + return 0; } else { ASSERT(0); } } - if (pRes) taosArrayDestroy(pRes); - atomic_store_8(&pTask->status, TASK_STATUS__IDLE); - return 0; FAIL: if (pRes) taosArrayDestroy(pRes); atomic_store_8(&pTask->status, TASK_STATUS__IDLE); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 720625a570..c1ab51bb27 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -80,6 +80,7 @@ # ./test.sh -f tsim/stream/triggerInterval0.sim # ./test.sh -f tsim/stream/triggerSession0.sim # ./test.sh -f tsim/stream/partitionby.sim +./test.sh -f tsim/stream/schedSnode.sim # ---- transaction diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index b720272116..91ce49bc8c 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -208,4 +208,4 @@ if $data11 != 2 then goto loop2 endi -system sh/stop_dnodes.sh \ No newline at end of file +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/distributesession0.sim b/tests/script/tsim/stream/distributesession0.sim index 78f65ed8a3..a165b86edd 100644 --- a/tests/script/tsim/stream/distributesession0.sim +++ b/tests/script/tsim/stream/distributesession0.sim @@ -55,4 +55,4 @@ if $data03 != 7 then return -1 endi -system sh/stop_dnodes.sh \ No newline at end of file +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/schedSnode.sim b/tests/script/tsim/stream/schedSnode.sim new file mode 100644 index 0000000000..dbf714a96f --- /dev/null +++ b/tests/script/tsim/stream/schedSnode.sim @@ -0,0 +1,173 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create snode on dnode 1 + +sql create database test vgroups 1; +sql create database target vgroups 1; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream stream_t1 trigger at_once into target.streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); + +sleep 1000 + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts1 values(1648791223002,2,2,3,1.1); +sql insert into ts1 values(1648791233003,3,2,3,2.1); +sql insert into ts2 values(1648791243004,4,2,43,73.1); +sql insert into ts1 values(1648791213002,24,22,23,4.1); +sql insert into ts1 values(1648791243005,4,20,3,3.1); +sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +sql insert into ts3 values(1648791223002,2,2,3,1.1); +sql insert into ts4 values(1648791233003,3,2,3,2.1); +sql insert into ts3 values(1648791243004,4,2,43,73.1); +sql insert into ts4 values(1648791213002,24,22,23,4.1); +sql insert into ts3 values(1648791243005,4,20,3,3.1); +sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +$loop_count = 0 +loop1: +sql select * from target.streamtST1; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 8 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 4 then + print ======$data03 + return -1 +endi + +if $data04 != 52 then + print ======$data04 + return -1 +endi + +if $data05 != 13 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 6 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop1 +endi + +if $data13 != 92 then + print ======$data13 + return -1 +endi + +if $data14 != 22 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 4 then + print =====data21=$data21 + goto loop1 +endi + +if $data22 != 4 then + print =====data22=$data22 + goto loop1 +endi + +if $data23 != 32 then + print ======$data23 + return -1 +endi + +if $data24 != 12 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 30 then + print =====data31=$data31 + goto loop1 +endi + +if $data32 != 30 then + print =====data32=$data32 + goto loop1 +endi + +if $data33 != 180 then + print ======$data33 + return -1 +endi + +if $data34 != 42 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);