Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2
This commit is contained in:
commit
17b7e3cc82
|
@ -249,6 +249,7 @@ typedef struct SStreamChildEpInfo {
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
||||||
} SStreamChildEpInfo;
|
} SStreamChildEpInfo;
|
||||||
|
|
||||||
typedef struct SStreamId {
|
typedef struct SStreamId {
|
||||||
|
@ -310,8 +311,9 @@ struct SStreamTask {
|
||||||
SHistDataRange dataRange;
|
SHistDataRange dataRange;
|
||||||
SStreamId historyTaskId;
|
SStreamId historyTaskId;
|
||||||
SStreamId streamTaskId;
|
SStreamId streamTaskId;
|
||||||
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
||||||
SArray* pRpcMsgList; // SArray<SRpcMsg*>
|
SArray* pRpcMsgList; // SArray<SRpcMsg*>
|
||||||
|
|
||||||
// output
|
// output
|
||||||
union {
|
union {
|
||||||
STaskDispatcherFixedEp fixedEpDispatcher;
|
STaskDispatcherFixedEp fixedEpDispatcher;
|
||||||
|
@ -554,6 +556,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||||
|
void streamTaskOpenUpstreamInput(SStreamTask* pTask);
|
||||||
|
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||||
|
|
||||||
void streamTaskInputFail(SStreamTask* pTask);
|
void streamTaskInputFail(SStreamTask* pTask);
|
||||||
int32_t streamTryExec(SStreamTask* pTask);
|
int32_t streamTryExec(SStreamTask* pTask);
|
||||||
|
|
|
@ -301,11 +301,11 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDownstream->pUpstreamEpInfoList == NULL) {
|
if (pDownstream->pUpstreamInfoList == NULL) {
|
||||||
pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES);
|
pDownstream->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo);
|
taosArrayPush(pDownstream->pUpstreamInfoList, &pEpInfo);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
|
||||||
|
|
||||||
pTask->refCnt = 1;
|
pTask->refCnt = 1;
|
||||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||||
|
@ -82,7 +82,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
|
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
|
||||||
initStreamStateAPI(&handle.api);
|
initStreamStateAPI(&handle.api);
|
||||||
|
|
||||||
|
|
|
@ -814,7 +814,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.checkpointId = pTask->chkInfo.checkpointId,
|
.checkpointId = pTask->chkInfo.checkpointId,
|
||||||
.vnode = NULL,
|
.vnode = NULL,
|
||||||
|
@ -865,6 +865,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
|
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskOpenUpstreamInput(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
|
|
|
@ -1788,10 +1788,13 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
||||||
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
int32_t tlen = len - (pBuff - buf);
|
int32_t tlen = len - (pBuff - buf);
|
||||||
|
|
||||||
void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0, pInfo->igCheckUpdate);
|
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||||
int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
|
int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||||
pInfo->pUpdateInfo = pUpInfo;
|
pInfo->pUpdateInfo = pUpInfo;
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(pUpInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -355,4 +355,27 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
||||||
|
|
||||||
|
void streamTaskOpenUpstreamInput(SStreamTask* pTask) {
|
||||||
|
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
|
if (num == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
|
||||||
|
pInfo->dataAllowed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
||||||
|
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
|
||||||
|
if (pInfo->taskId == taskId) {
|
||||||
|
pInfo->dataAllowed = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -114,7 +114,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
|
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
|
||||||
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
||||||
|
@ -180,13 +180,14 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
|
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
|
||||||
} else {
|
} else {
|
||||||
// todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ
|
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||||
// anymore
|
|
||||||
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0);
|
// close the inputQ for data from upstream task.
|
||||||
|
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||||
|
|
||||||
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
|
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
|
||||||
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
|
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
|
||||||
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
if (notReady > 0) {
|
if (notReady > 0) {
|
||||||
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
|
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
|
||||||
pTask->id.idStr, notReady, num);
|
pTask->id.idStr, notReady, num);
|
||||||
|
|
|
@ -157,11 +157,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
.retrieveLen = dataStrLen,
|
.retrieveLen = dataStrLen,
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
ASSERT(sz > 0);
|
ASSERT(sz > 0);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
|
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||||
req.dstNodeId = pEpInfo->nodeId;
|
req.dstNodeId = pEpInfo->nodeId;
|
||||||
req.dstTaskId = pEpInfo->taskId;
|
req.dstTaskId = pEpInfo->taskId;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
@ -516,7 +516,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
// this function is usually invoked by sink/agg task
|
// this function is usually invoked by sink/agg task
|
||||||
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
|
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
|
||||||
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
|
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
|
||||||
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num);
|
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
|
||||||
|
|
||||||
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr,
|
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, num);
|
pTask->info.taskLevel, num);
|
||||||
|
|
|
@ -355,7 +355,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// agg
|
// agg
|
||||||
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
|
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
|
||||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
|
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
|
||||||
pTask->numOfWaitingUpstream);
|
pTask->numOfWaitingUpstream);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -379,7 +379,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, in
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
|
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
|
||||||
pTask->id.idStr, numOfTasks);
|
pTask->id.idStr, numOfTasks);
|
||||||
|
|
||||||
|
|
|
@ -99,10 +99,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
||||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
|
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
|
||||||
|
|
||||||
int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
||||||
for (int32_t i = 0; i < epSz; i++) {
|
for (int32_t i = 0; i < epSz; i++) {
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
|
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||||
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
|
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
int32_t epSz;
|
int32_t epSz;
|
||||||
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
||||||
|
|
||||||
pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
|
pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < epSz; i++) {
|
for (int32_t i = 0; i < epSz; i++) {
|
||||||
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
|
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
|
||||||
if (pInfo == NULL) return -1;
|
if (pInfo == NULL) return -1;
|
||||||
|
@ -173,7 +173,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
taosMemoryFreeClear(pInfo);
|
taosMemoryFreeClear(pInfo);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo);
|
taosArrayPush(pTask->pUpstreamInfoList, &pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
|
@ -226,7 +226,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
walCloseReader(pTask->exec.pWalReader);
|
walCloseReader(pTask->exec.pWalReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
|
taosArrayDestroyP(pTask->pUpstreamInfoList, taosMemoryFree);
|
||||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||||
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||||
|
|
|
@ -507,9 +507,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
||||||
taosMemoryFreeClear(pNode);
|
taosMemoryFreeClear(pNode);
|
||||||
|
taosMemoryFreeClear(pVal);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
ASSERT(pVLen == pFileState->rowSize);
|
||||||
memcpy(pNewPos->pRowBuff, pVal, pVLen);
|
memcpy(pNewPos->pRowBuff, pVal, pVLen);
|
||||||
|
taosMemoryFreeClear(pVal);
|
||||||
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
|
|
|
@ -4,20 +4,16 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 50
|
sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
|
print step 1
|
||||||
|
|
||||||
print =============== create database
|
print =============== create database
|
||||||
sql create database test vgroups 1;
|
sql create database test vgroups 1;
|
||||||
sql select * from information_schema.ins_databases
|
|
||||||
if $rows != 3 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
|
||||||
|
|
||||||
sql use test;
|
sql use test;
|
||||||
|
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
sql insert into t1 values(1648791213001,2,2,3,1.1);
|
sql insert into t1 values(1648791213001,2,2,3,1.1);
|
||||||
|
|
||||||
|
@ -29,7 +25,7 @@ sleep 1000
|
||||||
sql select * from streamt;
|
sql select * from streamt;
|
||||||
|
|
||||||
$loop_count = $loop_count + 1
|
$loop_count = $loop_count + 1
|
||||||
if $loop_count == 20 then
|
if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -49,7 +45,13 @@ if $data02 != 3 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
print waiting for checkpoint generation 1 ......
|
||||||
|
|
||||||
|
sleep 25000
|
||||||
|
|
||||||
|
print restart taosd 01 ......
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
@ -63,7 +65,7 @@ sleep 1000
|
||||||
sql select * from streamt;
|
sql select * from streamt;
|
||||||
|
|
||||||
$loop_count = $loop_count + 1
|
$loop_count = $loop_count + 1
|
||||||
if $loop_count == 20 then
|
if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -93,7 +95,7 @@ sleep 1000
|
||||||
sql select * from streamt;
|
sql select * from streamt;
|
||||||
|
|
||||||
$loop_count = $loop_count + 1
|
$loop_count = $loop_count + 1
|
||||||
if $loop_count == 20 then
|
if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -115,13 +117,62 @@ endi
|
||||||
|
|
||||||
# row 1
|
# row 1
|
||||||
if $data11 != 1 then
|
if $data11 != 1 then
|
||||||
print =====data01=$data01
|
print =====data11=$data11
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data12 != 4 then
|
if $data12 != 4 then
|
||||||
print =====data02=$data02
|
print =====data12=$data12
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 2
|
||||||
|
|
||||||
|
print restart taosd 02 ......
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223003,5,2,3,1.1);
|
||||||
|
|
||||||
|
loop20:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print =====rows=$rows expect 2
|
||||||
|
goto loop20
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop20
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop20
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 2 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop20
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 9 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop20
|
||||||
|
endi
|
||||||
|
|
||||||
|
print end---------------------------------
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,103 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print step 1
|
||||||
|
|
||||||
|
sql create database test vgroups 4;
|
||||||
|
|
||||||
|
sql use test;
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791213001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop0:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows expect 1
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 3 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
print waiting for checkpoint generation 1 ......
|
||||||
|
|
||||||
|
sleep 25000
|
||||||
|
|
||||||
|
print restart taosd
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213002,3,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop1:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print =====rows=$rows expect 2
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 1 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 4 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print end---------------------------------
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue