Merge pull request #14028 from taosdata/feature/stream
feat(stream): support snode
This commit is contained in:
commit
57e6846dae
|
@ -152,7 +152,6 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
|||
typedef struct {
|
||||
char* qmsg;
|
||||
// followings are not applicable to encoder and decoder
|
||||
// void* inputHandle;
|
||||
void* executor;
|
||||
} STaskExec;
|
||||
|
||||
|
@ -400,15 +399,13 @@ typedef struct {
|
|||
|
||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskRun(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -161,7 +161,7 @@ int32_t tsDiskCfgNum = 0;
|
|||
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
||||
|
||||
// stream scheduler
|
||||
bool tsStreamSchedV = true;
|
||||
bool tsSchedStreamToSnode = true;
|
||||
|
||||
/*
|
||||
* minimum scale for whole system, millisecond by default
|
||||
|
|
|
@ -96,11 +96,11 @@ SArray *smGetMsgHandles() {
|
|||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -58,6 +58,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
|
|||
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
smSendRsp(pMsg, 0);
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -70,6 +71,7 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
|
||||
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
||||
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
smSendRsp(pMsg, terrno);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ typedef struct SVnodeMgmt {
|
|||
SWWorkerPool syncPool;
|
||||
SWWorkerPool writePool;
|
||||
SWWorkerPool applyPool;
|
||||
SWWorkerPool mergePool;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker monitorWorker;
|
||||
SHashObj *hash;
|
||||
|
@ -63,7 +62,6 @@ typedef struct {
|
|||
STaosQueue *pApplyQ;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMergeQ;
|
||||
} SVnodeObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10);
|
||||
|
||||
vmFreeQueue(pMgmt, pVnode);
|
||||
vnodeClose(pVnode->pImpl);
|
||||
|
|
|
@ -98,7 +98,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
|
||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
|
@ -119,7 +119,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
|
||||
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
|
@ -251,10 +251,9 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
|
||||
|
||||
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
|
||||
pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) {
|
||||
pVnode->pFetchQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -269,13 +268,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
|
||||
pVnode->pWriteQ = NULL;
|
||||
pVnode->pSyncQ = NULL;
|
||||
pVnode->pApplyQ = NULL;
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
pVnode->pMergeQ = NULL;
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
||||
|
@ -307,11 +304,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pSPool->max = tsNumOfVnodeSyncThreads;
|
||||
if (tWWorkerInit(pSPool) != 0) return -1;
|
||||
|
||||
SWWorkerPool *pMPool = &pMgmt->mergePool;
|
||||
pMPool->name = "vnode-merge";
|
||||
pMPool->max = tsNumOfVnodeMergeThreads;
|
||||
if (tWWorkerInit(pMPool) != 0) return -1;
|
||||
|
||||
SSingleWorkerCfg mgmtCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
|
@ -342,6 +334,5 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
|||
tWWorkerCleanup(&pMgmt->syncPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||
tWWorkerCleanup(&pMgmt->mergePool);
|
||||
dDebug("vnode workers are closed");
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
#include "tname.h"
|
||||
#include "tuuid.h"
|
||||
|
||||
extern bool tsStreamSchedV;
|
||||
extern bool tsSchedStreamToSnode;
|
||||
|
||||
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
|
||||
int32_t childId = taosArrayGetSize(pArray);
|
||||
|
@ -204,9 +204,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
|
||||
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
|
||||
SSnodeObj* pObj = NULL;
|
||||
pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
|
||||
void* pIter = NULL;
|
||||
// TODO random fetch
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
|
@ -214,7 +216,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
|||
const SSnodeObj* pSnode) {
|
||||
int32_t msgLen;
|
||||
|
||||
pTask->nodeId = 0;
|
||||
pTask->nodeId = SNODE_HANDLE;
|
||||
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
|
||||
|
||||
plan->execNode.nodeId = 0;
|
||||
|
@ -224,7 +226,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
|
||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, SNODE_HANDLE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -370,8 +372,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
}
|
||||
|
||||
if (totLevel > 1) {
|
||||
SStreamTask* pFinalTask;
|
||||
// inner plan
|
||||
SStreamTask* pInnerTask;
|
||||
// inner level
|
||||
{
|
||||
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
|
||||
taosArrayPush(pStream->tasks, &taskInnerLevel);
|
||||
|
@ -380,31 +382,51 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
|
||||
|
||||
pFinalTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskInnerLevel, pFinalTask);
|
||||
pInnerTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
|
||||
// input
|
||||
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// trigger
|
||||
pFinalTask->triggerParam = pStream->triggerParam;
|
||||
pInnerTask->triggerParam = pStream->triggerParam;
|
||||
|
||||
// dispatch
|
||||
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
|
||||
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pInnerTask) < 0) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// exec
|
||||
pFinalTask->execType = TASK_EXEC__PIPE;
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
pInnerTask->execType = TASK_EXEC__PIPE;
|
||||
|
||||
if (tsSchedStreamToSnode) {
|
||||
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (mndAssignTaskToSnode(pMnode, pTrans, pInnerTask, plan, pSnode) < 0) {
|
||||
ASSERT(0);
|
||||
sdbRelease(pSdb, pSnode);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// source plan
|
||||
// source level
|
||||
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
|
||||
taosArrayPush(pStream->tasks, &taskSourceLevel);
|
||||
|
||||
|
@ -434,9 +456,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||
|
||||
pTask->fixedEpDispatcher.taskId = pFinalTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId;
|
||||
pTask->fixedEpDispatcher.epSet = pFinalTask->epSet;
|
||||
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
|
||||
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
|
||||
|
||||
// exec
|
||||
pTask->execType = TASK_EXEC__PIPE;
|
||||
|
|
|
@ -78,8 +78,8 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
|||
|
||||
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
char *msg = pMsg->pCont;
|
||||
int32_t msgLen = pMsg->contLen;
|
||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
|
@ -105,23 +105,22 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
|
||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||
|
||||
SReadHandle handle = {
|
||||
.pMsgCb = &pNode->msgCb,
|
||||
};
|
||||
|
||||
/*pTask->exec.inputHandle = NULL;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->dataScan == 0);
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
|
||||
|
||||
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -130,7 +129,7 @@ static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pNode->msgCb);
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -151,7 +150,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -161,7 +160,7 @@ static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamTaskRecoverReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg);
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -171,7 +170,7 @@ static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp);
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
|
|||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
|
||||
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids);
|
||||
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||
|
@ -106,28 +106,28 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
|
|||
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
||||
void* metaGetIdx(SMeta* pMeta);
|
||||
void* metaGetIvtIdx(SMeta* pMeta);
|
||||
int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList);
|
||||
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||
|
||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||
|
||||
// tsdb
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||
SSubmitBlkRsp* pRsp);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
void* pMemRef);
|
||||
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
|
||||
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
|
||||
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||
SSubmitBlkRsp* pRsp);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
void* pMemRef);
|
||||
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
|
||||
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
|
||||
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
||||
|
||||
// tq
|
||||
int tqInit();
|
||||
|
@ -141,7 +141,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
|||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
@ -262,7 +262,7 @@ struct SSma {
|
|||
|
||||
#define SMA_CFG(s) (&(s)->pVnode->config)
|
||||
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
|
||||
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
|
||||
#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions)
|
||||
#define SMA_LOCKED(s) ((s)->locked)
|
||||
#define SMA_META(s) ((s)->pVnode->pMeta)
|
||||
#define SMA_VID(s) TD_VID((s)->pVnode)
|
||||
|
|
|
@ -333,7 +333,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
SReadHandle handle = {
|
||||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
|
@ -373,7 +372,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
|
@ -404,7 +403,6 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
};
|
||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||
|
@ -468,7 +466,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) {
|
||||
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
@ -489,7 +487,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -507,7 +505,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -515,7 +513,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -523,7 +521,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -167,8 +167,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
|
@ -304,18 +304,17 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp){
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t t = ntohl(*(int32_t*)pReq);
|
||||
int32_t t = ntohl(*(int32_t *)pReq);
|
||||
vError("rec ttl time:%d", t);
|
||||
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
|
||||
if(ret != 0){
|
||||
if (ret != 0) {
|
||||
goto end;
|
||||
}
|
||||
if(taosArrayGetSize(tbUids) > 0){
|
||||
if (taosArrayGetSize(tbUids) > 0) {
|
||||
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -4052,14 +4052,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
STimeWindowAggSupp twSup = {
|
||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
tsdbReaderT pDataReader = NULL;
|
||||
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
// for stram
|
||||
pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||
} else {
|
||||
// for tq
|
||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
|
|||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
||||
|
||||
streamTaskInput(pTask, (SStreamQueueItem*)trigger);
|
||||
streamLaunchByWrite(pTask, pTask->nodeId, pTask->pMsgCb);
|
||||
streamLaunchByWrite(pTask, pTask->nodeId);
|
||||
}
|
||||
|
||||
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||
|
@ -81,7 +81,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
|
||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
|
||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
|
@ -96,7 +96,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
|
|||
.pCont = pRunReq,
|
||||
.contLen = sizeof(SStreamTaskRunReq),
|
||||
};
|
||||
tmsgPutToQueue(pMsgCb, FETCH_QUEUE, &msg);
|
||||
tmsgPutToQueue(pTask->pMsgCb, FETCH_QUEUE, &msg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -136,7 +136,9 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
|||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId);
|
||||
|
||||
// 1. handle input
|
||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||
|
||||
|
@ -145,7 +147,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
streamExec(pTask, pTask->pMsgCb);
|
||||
} else {
|
||||
ASSERT(pTask->sinkType != TASK_SINK__NONE);
|
||||
while (1) {
|
||||
|
@ -161,34 +163,38 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||
|
||||
qInfo("task %d receive dispatch rsp", pTask->taskId);
|
||||
|
||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
// continue dispatch
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||
streamExec(pTask, pTask->pMsgCb);
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
streamDispatch(pTask, pMsgCb);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(vgId != 0);
|
||||
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
|
||||
req.taskId = downstreamTaskId;
|
||||
|
||||
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
|
||||
|
@ -199,6 +199,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
|
||||
qInfo("stream continue dispatching: task %d", pTask->taskId);
|
||||
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
||||
|
|
|
@ -107,18 +107,19 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
pRes = streamExecForQall(pTask, pRes);
|
||||
if (pRes == NULL) goto FAIL;
|
||||
|
||||
break;
|
||||
taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
return 0;
|
||||
} else if (execStatus == TASK_STATUS__CLOSING) {
|
||||
continue;
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING) {
|
||||
break;
|
||||
ASSERT(taosArrayGetSize(pRes) == 0);
|
||||
taosArrayDestroy(pRes);
|
||||
return 0;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
if (pRes) taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
return 0;
|
||||
FAIL:
|
||||
if (pRes) taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
|
|
|
@ -80,6 +80,7 @@
|
|||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
# ./test.sh -f tsim/stream/triggerSession0.sim
|
||||
./test.sh -f tsim/stream/partitionby.sim
|
||||
./test.sh -f tsim/stream/schedSnode.sim
|
||||
|
||||
|
||||
# ---- transaction
|
||||
|
|
|
@ -208,4 +208,4 @@ if $data11 != 2 then
|
|||
goto loop2
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/stop_dnodes.sh
|
||||
|
|
|
@ -55,4 +55,4 @@ if $data03 != 7 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/stop_dnodes.sh
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql create snode on dnode 1
|
||||
|
||||
sql create database test vgroups 1;
|
||||
sql create database target vgroups 1;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create table ts3 using st tags(3,2,2);
|
||||
sql create table ts4 using st tags(4,2,2);
|
||||
sql create stream stream_t1 trigger at_once into target.streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts2 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts3 values(1648791213001,1,12,3,1.0);
|
||||
sql insert into ts4 values(1648791213001,1,12,3,1.0);
|
||||
|
||||
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||
|
||||
sql insert into ts1 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts1 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts2 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts1 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts1 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
sql insert into ts3 values(1648791223002,2,2,3,1.1);
|
||||
sql insert into ts4 values(1648791233003,3,2,3,2.1);
|
||||
sql insert into ts3 values(1648791243004,4,2,43,73.1);
|
||||
sql insert into ts4 values(1648791213002,24,22,23,4.1);
|
||||
sql insert into ts3 values(1648791243005,4,20,3,3.1);
|
||||
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sql select * from target.streamtST1;
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 8 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 52 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 13 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != 6 then
|
||||
print =====data12=$data12
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data13 != 92 then
|
||||
print ======$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 22 then
|
||||
print ======$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 3 then
|
||||
print ======$data15
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 4 then
|
||||
print =====data21=$data21
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data22 != 4 then
|
||||
print =====data22=$data22
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data23 != 32 then
|
||||
print ======$data23
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 12 then
|
||||
print ======$data24
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 30 then
|
||||
print =====data31=$data31
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data32 != 30 then
|
||||
print =====data32=$data32
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data33 != 180 then
|
||||
print ======$data33
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 42 then
|
||||
print ======$data34
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
|
Loading…
Reference in New Issue