Merge pull request #23455 from taosdata/fix/liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2023-11-01 12:01:54 +08:00 committed by GitHub
commit 0cf0ff3bbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 429 additions and 260 deletions

View File

@ -236,6 +236,11 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef struct {
int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset;
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo {
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo;
typedef struct STaskOutputQueue {
typedef struct STaskQueue {
int8_t status;
SStreamQueue* queue;
} STaskOutputQueue;
typedef struct STaskInputInfo {
int8_t status;
SStreamQueue* queue;
} STaskInputInfo;
} STaskQueue;
typedef struct STaskSchedInfo {
int8_t status;
@ -384,6 +384,7 @@ typedef struct STaskOutputInfo {
};
int8_t type;
STokenBucket* pTokenBucket;
SArray* pDownstreamUpdateList;
} STaskOutputInfo;
typedef struct SUpstreamInfo {
@ -395,8 +396,8 @@ struct SStreamTask {
int64_t ver;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputQueue outputq;
STaskInputInfo inputInfo;
STaskQueue outputq;
STaskQueue inputq;
STaskSchedInfo schedInfo;
STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo;
@ -431,7 +432,7 @@ struct SStreamTask {
typedef struct STaskStartInfo {
int64_t startTs;
int64_t readyTs;
int32_t startedAfterNodeUpdate;
int32_t startAllTasksFlag;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime;
} STaskStartInfo;
@ -645,7 +646,8 @@ typedef struct STaskStatusEntry {
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
@ -733,7 +735,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
//int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
@ -744,7 +745,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
@ -796,7 +797,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
@ -806,12 +806,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,

View File

@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -1156,7 +1157,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint")
taosArrayDestroy(pNodeSnapshot);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
@ -2059,11 +2066,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return info;
}
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
while (1) {
@ -2075,7 +2083,22 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
SNodeEntry entry = {0};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
entry.nodeId = pVgroup->vgId;
entry.hbTimestamp = -1;
entry.hbTimestamp = pVgroup->updateTime;
if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (!pVgroup->vnodeGid[i].syncRestore) {
*allReady = false;
break;
}
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
*allReady = false;
break;
}
}
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
@ -2119,7 +2142,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
continue;
}
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name);
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again
@ -2216,23 +2241,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
}
}
static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
break;
}
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
@ -2319,7 +2347,14 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
bool allVnodeReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
if (!allVnodeReady) {
taosArrayDestroy(pNodeSnapshot);
atomic_store_32(&mndNodeCheckSentinel, 0);
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
return 0;
}
taosThreadMutexLock(&execInfo.lock);
removeExpirednodeEntryAndTask(pNodeSnapshot);
@ -2359,10 +2394,6 @@ typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
typedef struct SMStreamTaskResetMsg {
int8_t placeHolder;
} SMStreamTaskResetMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2577,6 +2608,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
}
}
return TSDB_CODE_SUCCESS;
}
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
break;
}
}
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
@ -2602,29 +2670,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}
setNodeEpsetExpiredFlag(req.pUpdateNodes);
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}
if (p->stage != pEntry->stage && pEntry->stage != -1) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pEntry->stage = p->stage;
break;
}
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
} else {
streamTaskStatusCopy(pEntry, p);
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId);
@ -2638,7 +2697,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
pEntry->status = p->status;
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
@ -2647,13 +2706,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}
}
taosThreadMutexUnlock(&execInfo.lock);
taosArrayDestroy(req.pTaskStatus);
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}

View File

@ -158,7 +158,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);
// tq util

View File

@ -231,7 +231,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -1264,17 +1264,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
#if 0
// the fill-history task starts to process data in wal, let's set it status to be normal now
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask);
}
#endif
// now the fill-history task starts to scan data from wal files.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
tqScanWalAsync(pTq, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false);
}
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
@ -1527,14 +1524,17 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (status == TASK_STATUS__UNINIT) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
@ -1550,11 +1550,16 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) {
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync(pTq, false);
} else {
streamSchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
}
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@ -1896,9 +1901,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return rsp.code;
}
taosWUnLockLatch(&pMeta->lock);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
taosWLockLatch(&pMeta->lock);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
@ -1942,7 +1953,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.startedAfterNodeUpdate = 1;
pMeta->startInfo.startAllTasksFlag = 1;
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
@ -1951,7 +1962,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} else {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startedAfterNodeUpdate = 0;
pMeta->startInfo.startAllTasksFlag = 0;
taosWUnLockLatch(&pMeta->lock);
} else {
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
@ -1981,8 +1992,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq);
tqResetStreamTaskStatus(pTq);
tqLaunchStreamTaskAsync(pTq);
} else {
vInfo("vgId:%d, follower node not start stream tasks", vgId);
}

View File

@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks);
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.

View File

@ -98,6 +98,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskReadyInfo(pTask);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
@ -111,7 +112,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
return 0;
}
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
@ -227,7 +228,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t tqResetStreamTaskStatus(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
@ -242,10 +243,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if ((*pTask)->info.fillHistory != 1) {
streamTaskResetStatus(*pTask);
}
streamTaskResetStatus(*pTask);
}
return 0;
@ -344,13 +342,13 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
}
// check if input queue is full or not
if (streamQueueIsFull(pTask->inputInfo.queue)) {
if (streamQueueIsFull(pTask->inputq.queue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
return false;
}
// the input queue of downstream task is full, so the output is blocked, stopped for a while
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
return false;
}
@ -444,7 +442,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);

View File

@ -1196,11 +1196,28 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p
}
}
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SFileDataBlockInfo* pBlock) {
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->record.firstKey)) ||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->record.lastKey));
int64_t key = 0;
if (bHasDataInLastBlock) {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt);
} else {
key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey;
}
return key;
}
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock,
SLastBlockReader* pLastBlockReader, int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order);
return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) ||
(!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key));
}
static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) {
@ -2637,6 +2654,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
}
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) {
if(!hasDataInLastBlock(pLastBlockReader)) {
return true;
} else {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt);
}
}
static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
@ -2672,17 +2698,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block
code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) {
// data in memory that are earlier than current file block
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) {
// data in memory that are earlier than current file block and stt blocks
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey =
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order);
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
if (!bHasDataInLastBlock ||
((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) {
if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
@ -2693,7 +2715,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
@ -2720,10 +2742,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
// data in stt now overlaps with current active file data block, need to composed with file data block.
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
if ((keyInStt >= pBlockInfo->record.firstKey && asc) || (keyInStt <= pBlockInfo->record.lastKey && (!asc))) {
tsdbDebug("%p keyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
keyInStt, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
int64_t lastKeyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) ||
(lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) {
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
lastKeyInStt, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
break;
}
}

View File

@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pVnode->restored = true;
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) {
if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
return;
@ -564,8 +564,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
tqResetStreamTaskStatus(pVnode->pTq);
tqLaunchStreamTaskAsync(pVnode->pTq);
}
} else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);

View File

@ -296,7 +296,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0;
}
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);

View File

@ -175,7 +175,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// set task status
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
pTask->checkpointingId = checkpointId;
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
return code;
}
}
{ // todo: remove this when the pipeline checkpoint generating is used.

View File

@ -1043,8 +1043,8 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
// put data into inputQ of current task is also allowed
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
} else {
@ -1096,7 +1096,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream
taosThreadMutexLock(&pTask->lock);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);

View File

@ -106,7 +106,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
taosMsleep(1000);
continue;
@ -217,7 +217,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
return 0;
}
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
@ -309,9 +309,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
}
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
@ -374,7 +371,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
@ -525,7 +522,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t blockSize = 0;
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(pTask)) {
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__UNINIT)) {
stDebug("s-task:%s stream task is stopped", id);
break;
}
@ -630,7 +627,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
}
taosThreadMutexLock(&pTask->lock);
if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) ||
if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) ||
streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);

View File

@ -211,7 +211,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
stage);
return pMeta;
_err:
_err:
taosMemoryFree(pMeta->path);
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -228,8 +228,12 @@ _err:
}
int32_t streamMetaReopen(SStreamMeta* pMeta) {
// backup the restart flag
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
streamMetaClear(pMeta);
pMeta->startInfo.startAllTasksFlag = restartFlag;
// NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
@ -442,24 +446,6 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size;
}
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int32_t num = 0;
size_t size = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < size; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (p == NULL) {
continue;
}
if ((*p)->info.fillHistory == 0) {
num += 1;
}
}
return num;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
@ -795,6 +781,15 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
for (int j = 0; j < numOfVgs; ++j) {
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -828,6 +823,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
taosArrayPush(pReq->pTaskStatus, &entry);
}
int32_t numOfVgs = 0;
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
for (int j = 0; j < numOfVgs; ++j) {
int32_t vgId = 0;
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
tEndDecode(pDecoder);
return 0;
}
@ -882,13 +888,14 @@ void metaHbToMnode(void* param, void* tmrId) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasValEpset = false;
bool hasMnodeEpset = false;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
// not report the status of fill-history task
@ -901,7 +908,7 @@ void metaHbToMnode(void* param, void* tmrId) {
.status = streamTaskGetStatus(*pTask, NULL),
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
@ -920,18 +927,39 @@ void metaHbToMnode(void* param, void* tmrId) {
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
}
taosArrayPush(hbMsg.pTaskStatus, &entry);
taosThreadMutexLock(&(*pTask)->lock);
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
if (!hasValEpset) {
bool exist = false;
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
exist = true;
break;
}
}
if (!exist) {
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
}
}
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
taosThreadMutexUnlock(&(*pTask)->lock);
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
hasMnodeEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock);
if (hasValEpset) {
if (hasMnodeEpset) {
int32_t code = 0;
int32_t tlen = 0;
@ -976,6 +1004,8 @@ void metaHbToMnode(void* param, void* tmrId) {
}
taosArrayDestroy(hbMsg.pTaskStatus);
taosArrayDestroy(hbMsg.pUpdateNodes);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
@ -1058,5 +1088,6 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pReadyTaskSet);
pStartInfo->startedAfterNodeUpdate = 0;
pStartInfo->startAllTasksFlag = 0;
pStartInfo->readyTs = 0;
}

View File

@ -169,7 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
if (qItem == NULL) {
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(WAIT_FOR_DURATION);
@ -211,7 +211,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
}
} else {
@ -232,7 +232,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
}
@ -240,7 +240,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputInfo.queue);
streamQueueProcessSuccess(pTask->inputq.queue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
@ -258,12 +258,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
@ -290,7 +290,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pTask->inputInfo.queue)) {
if (streamQueueIsFull(pTask->inputq.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",

View File

@ -35,7 +35,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static int32_t updateTaskReadyInMeta(SStreamTask* pTask);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) {
char* p = NULL;
@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p);
updateTaskReadyInMeta(pTask);
streamMetaUpdateTaskReadyInfo(pTask);
return TSDB_CODE_SUCCESS;
}
@ -114,7 +114,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
}
// check status
static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
void streamTaskCheckDownstream(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
@ -163,10 +163,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
}
} else {
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskOnHandleEventSuccess(pTask->status.pSM);
doProcessDownstreamReadyRsp(pTask);
}
return 0;
}
static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
@ -271,16 +269,14 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__READY);
// todo refactor: remove this later
// if (pTask->info.fillHistory == 1) {
// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
// pTask->status.taskStatus = TASK_STATUS__DROPPING;
// ASSERT(pTask->hTaskInfo.id.taskId == 0);
// }
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (startVer == -1) {
startVer = pTask->chkInfo.nextProcessVer;
}
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
id, p, pTask->status.schedStatus, startVer);
} else {
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
}
@ -311,41 +307,21 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// todo: refactor this function.
static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM);
#if 0
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;
const char* str = streamGetTaskStatusStr(status);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY);
streamTaskSetRangeStreamCalc(pTask);
if (status == TASK_STATUS__SCAN_HISTORY) {
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready
streamLaunchFillHistoryTask(pTask);
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event;
if (pTask->info.fillHistory == 0) {
event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT;
} else {
// fill-history tasks are not allowed to reach here.
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->hTaskInfo.id.taskId == 0);
} else {
stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
streamTaskEnablePause(pTask);
}
event = TASK_EVENT_INIT_SCANHIST;
}
#endif
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id);
@ -354,8 +330,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_DOWNSTREAM_READY) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false;
bool found = false;
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
for (int32_t i = 0; i < numOfReqs; i++) {
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
@ -402,6 +378,26 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == pRsp->downstreamNodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
return 0;
}
@ -584,7 +580,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskSetSchedStatusInactive(pTask);
taosWLockLatch(&pMeta->lock);
@ -937,17 +933,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
}
}
// only the downstream tasks are ready, set the task to be ready to work.
void streamTaskCheckDownstream(SStreamTask* pTask) {
// if (pTask->info.fillHistory) {
// ASSERT(0);
// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
// return;
// }
doCheckDownstreamStatus(pTask);
}
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
#if 0
int8_t status = pTask->status.taskStatus;
@ -1062,7 +1047,7 @@ void streamTaskEnablePause(SStreamTask* pTask) {
pTask->status.pauseAllowed = 1;
}
int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
@ -1074,13 +1059,9 @@ int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = pTask->execInfo.start;
if (pStartInfo->startTs != 0) {
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
} else {
pStartInfo->elapsedTime = 0;
}
pStartInfo->readyTs = pTask->execInfo.start;
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
streamMetaResetStartInfo(pStartInfo);

View File

@ -59,7 +59,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
if (fillHistory) {
@ -337,8 +337,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputInfo.queue) {
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
if (pTask->inputq.queue) {
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
}
if (pTask->outputq.queue) {
@ -399,8 +399,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pDownstreamUpdateList = NULL;
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@ -409,10 +412,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) {
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -425,7 +428,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
pTask->execInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
@ -462,6 +465,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
taosThreadMutexInit(&pTask->lock, &attr);
streamTaskOpenAllUpstreamInput(pTask);
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pTask->outputInfo.pDownstreamUpdateList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -105,21 +105,33 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) {
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) {
if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) {
return true;
}
}
return false;
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
for (int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
if (pTrans->state.state == state && pTrans->event == event) {
return pTrans;
}
}
if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) {
if (isUnsupportedTransform(state, event)) {
return NULL;
} else {
ASSERT(0);
}
return NULL;
}
@ -137,9 +149,9 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
@ -181,19 +193,9 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
return NULL;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {
SStreamTask* pTask = pSM->pTask;
taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
if (pTrans == NULL) {
stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name);
return -1;
} else {
stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
}
const char* id = pTask->id.idStr;
if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent);
@ -206,22 +208,18 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name);
stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name);
return TSDB_CODE_SUCCESS;
} else {// this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr,
StreamTaskEventList[event].name);
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name);
taosMsleep(100);
} else {
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
return TSDB_CODE_INVALID_PARA;
}
}
} else {
if (pSM->pActiveTrans != NULL) {
ASSERT(!pSM->pActiveTrans->autoInvokeEndFn);
stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr,
pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name);
}
} else { // override current active trans
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
@ -230,7 +228,41 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM);
streamTaskOnHandleEventSuccess(pSM, event);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
SStreamTask* pTask = pSM->pTask;
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
taosThreadMutexUnlock(&pTask->lock);
taosMsleep(100);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
} else {
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
}
if (pSM->pActiveTrans != NULL) {
// currently in some state transfer procedure, not auto invoke transfer, abort it
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
}
doHandleEvent(pSM, event, pTrans);
break;
}
}
@ -244,7 +276,7 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
pSM->prev.evt = pTrans->event;
}
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
SStreamTask* pTask = pSM->pTask;
// do update the task status
@ -255,13 +287,20 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
ETaskStatus s = pSM->current.state;
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
// the pSM->prev.evt may be 0, so print string is not appropriate.
stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt,
pTask->id.idStr);
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
}
keepPrevInfo(pSM);
pSM->current = pTrans->next;
@ -282,7 +321,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans;
@ -291,7 +330,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
} else {
return code;
}
@ -323,6 +362,9 @@ void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s level:%d fill-history:%d vgId:%d set uninit, prev status:%s", pTask->id.idStr,
pTask->info.taskLevel, pTask->info.fillHistory, pTask->pMeta->vgId, pSM->current.name);
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList);
@ -394,12 +436,12 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL,
NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL,
NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
@ -442,9 +484,9 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);

View File

@ -17,6 +17,7 @@ sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sleep 1000
sql pause stream streams1;
sql insert into ts1 values(1648791213001,1,12,3,1.0);
@ -246,6 +247,7 @@ sql create table ts4 using st tags(4,2,2);
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s);
sleep 1000
sql pause stream streams3;

View File

@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);

View File

@ -35,6 +35,8 @@ class TDTestCase:
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
init_num = 0
time.sleep(1)
for i in range(self.tdCom.range_count):
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])