Merge pull request #24228 from taosdata/fix/3_liaohj

refactor: remove stream-scan-history event for stream task.
This commit is contained in:
Haojun Liao 2024-01-03 14:43:20 +08:00 committed by GitHub
commit a2e85c44c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 197 additions and 122 deletions

View File

@ -81,7 +81,7 @@ typedef enum ETaskStatus {
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__STREAM_SCAN_HISTORY, // TASK_STATUS__STREAM_SCAN_HISTORY,
} ETaskStatus; } ETaskStatus;
enum { enum {
@ -138,15 +138,14 @@ enum {
typedef enum EStreamTaskEvent { typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCANHIST = 0x2, TASK_EVENT_INIT_SCANHIST = 0x2,
TASK_EVENT_INIT_STREAM_SCANHIST = 0x3, TASK_EVENT_SCANHIST_DONE = 0x3,
TASK_EVENT_SCANHIST_DONE = 0x4, TASK_EVENT_STOP = 0x4,
TASK_EVENT_STOP = 0x5, TASK_EVENT_GEN_CHECKPOINT = 0x5,
TASK_EVENT_GEN_CHECKPOINT = 0x6, TASK_EVENT_CHECKPOINT_DONE = 0x6,
TASK_EVENT_CHECKPOINT_DONE = 0x7, TASK_EVENT_PAUSE = 0x7,
TASK_EVENT_PAUSE = 0x8, TASK_EVENT_RESUME = 0x8,
TASK_EVENT_RESUME = 0x9, TASK_EVENT_HALT = 0x9,
TASK_EVENT_HALT = 0xA, TASK_EVENT_DROPPING = 0xA,
TASK_EVENT_DROPPING = 0xB,
} EStreamTaskEvent; } EStreamTaskEvent;
typedef struct { typedef struct {
@ -793,6 +792,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask); void streamTaskRestoreStatus(SStreamTask* pTask);

View File

@ -251,7 +251,7 @@ int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, in
static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory, SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory,
bool hasExtraSink, int64_t firstWindowSkey, bool hasFillHistory) { bool hasExtraSink, int64_t nextWindowSkey, bool hasFillHistory) {
SStreamTask* pTask = SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory); tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory);
if (pTask == NULL) { if (pTask == NULL) {
@ -262,7 +262,7 @@ static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList,
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN; pWindow->skey = INT64_MIN;
pWindow->ekey = firstWindowSkey - 1; pWindow->ekey = nextWindowSkey - 1;
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey);
// sink or dispatch // sink or dispatch
@ -382,7 +382,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t ui
epsetAssign(&(pTask)->info.mnodeEpset, pEpset); epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
// todo set the correct ts, which should be last key of queried table. // set the correct ts, which is the last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN; pWindow->skey = INT64_MIN;
pWindow->ekey = nextWindowSkey - 1; pWindow->ekey = nextWindowSkey - 1;

View File

@ -2886,38 +2886,38 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
return NULL; return NULL;
} }
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { //static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { // if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { // if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
int32_t numOfReady = 0; // int32_t numOfReady = 0;
int32_t numOfTotal = 0; // int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { // for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); // STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pTaskEntry->id.streamId == pId->streamId) { // if (pTaskEntry->id.streamId == pId->streamId) {
numOfTotal++; // numOfTotal++;
//
if (pTaskEntry->id.taskId != pId->taskId) { // if (pTaskEntry->id.taskId != pId->taskId) {
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); // STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->status == TASK_STATUS__READY) { // if (pEntry->status == TASK_STATUS__READY) {
numOfReady++; // numOfReady++;
} // }
} // }
} // }
} // }
//
if (numOfReady > 0) { // if (numOfReady > 0) {
mDebug("stream:0x%" PRIx64 // mDebug("stream:0x%" PRIx64
" %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", // " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); // pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
return true; // return true;
} else { // } else {
return false; // return false;
} // }
} // }
} // }
//
return false; // return false;
} //}
// currently only handle the sink task // currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing // 1. sink task, drop related fill-history task msg is missing
@ -3091,18 +3091,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (p->status != TASK_STATUS__READY) { if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { // if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); // bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
if (drop) { // if (drop) {
SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); // SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
if (pStreamObj == NULL) { // if (pStreamObj == NULL) {
mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); // mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
} else { // } else {
mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); // mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
mndReleaseStream(pMnode, pStreamObj); // mndReleaseStream(pMnode, pStreamObj);
} // }
} // }
} // }
} }
} }

View File

@ -1037,15 +1037,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; ASSERT(0);
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); // STimeWindow* pWindow = &pTask->dataRange.window;
// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask));
// Not update the fill-history time window until the state transfer is completed. //
tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 // // Not update the fill-history time window until the state transfer is completed.
", window:%" PRId64 " - %" PRId64, // tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); // ", window:%" PRId64 " - %" PRId64,
// id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask); //
// code = streamTaskScanHistoryDataComplete(pTask);
} }
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1170,7 +1171,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ? // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);
} }
} }
@ -1362,9 +1363,9 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { // if (status == TASK_STATUS__STREAM_SCAN_HISTORY) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
} // }
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);

View File

@ -195,7 +195,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal); int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal); int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64, wDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
", 0x%" PRIx64,
vgId, offset, lastVer, committedVer, appliedVer, id); vgId, offset, lastVer, committedVer, appliedVer, id);
while (offset <= appliedVer) { while (offset <= appliedVer) {

View File

@ -39,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) {
bool shouldIdle = true; bool shouldIdle = true;
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
if (shouldIdle) { // if (shouldIdle) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
int32_t times = (--pMeta->walScanCounter); int32_t times = (--pMeta->walScanCounter);
ASSERT(pMeta->walScanCounter >= 0); ASSERT(pMeta->walScanCounter >= 0);
@ -50,7 +50,7 @@ int32_t tqScanWal(STQ* pTq) {
} else { } else {
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
} }
} // }
taosMsleep(SCAN_WAL_IDLE_DURATION); taosMsleep(SCAN_WAL_IDLE_DURATION);
} }

View File

@ -545,7 +545,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && restored && p->info.fillHistory == 0) { if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event); streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) { } else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);

View File

@ -567,17 +567,29 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
if (vnodeIsRoleLeader(pVnode)) { if (vnodeIsRoleLeader(pVnode)) {
// start to restore all stream tasks // start to restore all stream tasks
if (tsDisableStream) { if (tsDisableStream) {
streamMetaWUnLock(pMeta);
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta); tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta);
{
if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
} else {
pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
} }
}
}
} else { } else {
streamMetaWUnLock(pMeta);
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
} }
streamMetaWUnLock(pMeta);
} }
static void vnodeBecomeFollower(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {

View File

@ -846,6 +846,7 @@ bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWind
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap); void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
int32_t encodeSSessionKey(void** buf, SSessionKey* key); int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -1961,8 +1961,10 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
} }
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pBlock, true); checkUpdateData(pInfo, true, pBlock, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {

View File

@ -622,6 +622,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
} }
setEventWindowFlag(pAggSup, &curInfo); setEventWindowFlag(pAggSup, &curInfo);
if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) { if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) {
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
continue; continue;
} }
@ -653,6 +654,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,

View File

@ -2761,6 +2761,18 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey,
} }
} }
void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
reloadAggSupFromDownStream(downstream->pDownstream[0], pAggSup);
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void streamSessionSemiReloadState(SOperatorInfo* pOperator) { void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -2792,6 +2804,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
void streamSessionReloadState(SOperatorInfo* pOperator) { void streamSessionReloadState(SOperatorInfo* pOperator) {
@ -2844,6 +2857,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -3726,6 +3740,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,

View File

@ -8161,9 +8161,9 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (interval.interval > 0) { if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval); pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision);
} else { } else {
pStmt->pReq->lastTs = lastTs; pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
} }
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq); code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq);
} }

View File

@ -34,6 +34,8 @@ typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct SAttachedEventInfo { typedef struct SAttachedEventInfo {
ETaskStatus status; // required status that this event can be handled ETaskStatus status; // required status that this event can be handled
EStreamTaskEvent event; // the delayed handled event EStreamTaskEvent event; // the delayed handled event
void* pParam;
void* pFn;
} SAttachedEventInfo; } SAttachedEventInfo;
typedef struct STaskStateTrans { typedef struct STaskStateTrans {

View File

@ -356,10 +356,17 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
} else { } else {
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
pStreamTask->id.idStr, tstrerror(code));
streamMetaReleaseTask(pMeta, pStreamTask);
return code;
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
} }
}
// wait for the stream task to handle all in the inputQ, and to be idle // wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask); waitForTaskIdle(pTask, pStreamTask);
@ -665,7 +672,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
ETaskStatus st = streamTaskGetStatus(pTask, NULL); ETaskStatus st = streamTaskGetStatus(pTask, NULL);
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__STREAM_SCAN_HISTORY || return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__STREAM_SCAN_HISTORY*/ ||
st == TASK_STATUS__CK); st == TASK_STATUS__CK);
} }

View File

@ -1486,7 +1486,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, event); stError("vgId:%d failed to handle event:%d", pMeta->vgId, event);

View File

@ -117,7 +117,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
void* p = NULL; void* p = NULL;
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) { if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
code = code_file; code = code_file;
qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
@ -449,13 +449,13 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (!ppBuff) { SArray* pWinStates = NULL;
return TSDB_CODE_FAILED; if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} }
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= 0) { if (pCur->buffIndex >= 0) {
int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= size) { if (pCur->buffIndex >= size) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -52,7 +52,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && if ((status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
@ -158,7 +158,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(pTask->status.downstreamReady == 1 && ASSERT(pTask->status.downstreamReady == 1 &&
((status == TASK_STATUS__SCAN_HISTORY) || (status == TASK_STATUS__STREAM_SCAN_HISTORY))); ((status == TASK_STATUS__SCAN_HISTORY)/* || (status == TASK_STATUS__STREAM_SCAN_HISTORY)*/));
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
return doStartScanHistoryTask(pTask); return doStartScanHistoryTask(pTask);
@ -374,10 +374,14 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask); streamTaskStartScanHistory(pTask);
} else {
stDebug("s-task:%s scan wal data, status:%s", id, p);
}
// NOTE: there will be an deadlock if launch fill history here. // NOTE: there will be an deadlock if launch fill history here.
// // start the related fill-history task, when current task is ready // // start the related fill-history task, when current task is ready
@ -391,7 +395,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event; EStreamTaskEvent event;
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
} else { } else {
event = TASK_EVENT_INIT_SCANHIST; event = TASK_EVENT_INIT_SCANHIST;
} }
@ -631,7 +635,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { if (status != TASK_STATUS__SCAN_HISTORY /*&& status != TASK_STATUS__STREAM_SCAN_HISTORY*/) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p,
pReq->upstreamTaskId); pReq->upstreamTaskId);
@ -693,7 +697,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ // execute in the scan history complete call back msg, ready to process data from inputQ

View File

@ -31,14 +31,14 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"}, {.state = TASK_STATUS__CK, .name = "checkpoint"},
{.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, // {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
}; };
SStreamEventInfo StreamTaskEventList[12] = { SStreamEventInfo StreamTaskEventList[12] = {
{.event = 0, .name = ""}, // dummy event, place holder {.event = 0, .name = ""}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"}, {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"},
{.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"}, // {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"},
{.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"}, {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
{.event = TASK_EVENT_STOP, .name = "stopping"}, {.event = TASK_EVENT_STOP, .name = "stopping"},
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"}, {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
@ -110,12 +110,12 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE // todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) { static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { if (/*event == TASK_EVENT_INIT_STREAM_SCANHIST || */event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
return (state != TASK_STATUS__UNINIT); return (state != TASK_STATUS__UNINIT);
} }
if (event == TASK_EVENT_SCANHIST_DONE) { if (event == TASK_EVENT_SCANHIST_DONE) {
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY); return (state != TASK_STATUS__SCAN_HISTORY/* && state != TASK_STATUS__STREAM_SCAN_HISTORY*/);
} }
if (event == TASK_EVENT_GEN_CHECKPOINT) { if (event == TASK_EVENT_GEN_CHECKPOINT) {
@ -278,10 +278,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
ETaskStatus s = streamTaskGetStatus(pTask, NULL); ETaskStatus s = streamTaskGetStatus(pTask, NULL);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) {
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event)); stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
taosMsleep(100); taosMsleep(100);
} else { } else {
@ -482,14 +482,14 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); // trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event // scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status // halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
@ -499,8 +499,8 @@ void doInitStateTransferTable(void) {
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
@ -519,8 +519,8 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
@ -554,8 +554,8 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event // dropping related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
@ -574,7 +574,7 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); // trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans); // taosArrayPush(streamTaskSMTrans, &trans);
} }
//clang-format on //clang-format on

View File

@ -259,11 +259,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
int64_t contLen; int64_t contLen;
bool seeked = false; bool seeked = false;
wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64", 0x%"PRIx64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer, pRead->readerId);
// TODO: valid ver // TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) { if (ver > pRead->pWal->vers.commitVer) {
return -1; return -1;

View File

@ -31,6 +31,8 @@ sql insert into t4 values(1648791223000,1,1,10,3.0);
sql insert into t4 values(1648791233000,0,2,11,1.0); sql insert into t4 values(1648791233000,0,2,11,1.0);
sql insert into t4 values(1648791243000,1,9,12,2.0); sql insert into t4 values(1648791243000,1,9,12,2.0);
sleep 1000
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9; sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9;
sleep 1000 sleep 1000
@ -79,6 +81,37 @@ if $data21 != 2 then
goto loop0 goto loop0
endi endi
sql insert into t3 values(1648791222000,0,1,7,3.0);
$loop_count = 0
loop1:
sleep 300
print 2 sql select * from streamt0 order by 1, 2, 3, 4;
sql select * from streamt0 order by 1, 2, 3, 4;
print
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print ======rows=$rows
goto loop1
endi
if $data01 != 5 then
print ======data01=$data01
goto loop0
endi
print event1 end print event1 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -89,7 +89,7 @@ class TDTestCase:
sql = "select count(*) from sta" sql = "select count(*) from sta"
# loop wait max 60s to check count is ok # loop wait max 60s to check count is ok
tdLog.info("loop wait result ...") tdLog.info("loop wait result ...")
tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) tdSql.checkDataLoop(0, 0, 100000, sql, loopCount=120, waitTime=0.5)
time.sleep(5) time.sleep(5)