fix(stream): fix bugs caused by refactor.

This commit is contained in:
Haojun Liao 2023-10-19 17:59:04 +08:00
parent 1eac7f44c0
commit 8729bdac1e
6 changed files with 50 additions and 31 deletions

View File

@ -569,6 +569,10 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
}
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
if (taosArrayGetSize(pTasksList) < SINK_NODE_LEVEL || pUpstreamTask == NULL) {
return;
}
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
@ -628,7 +632,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
}
setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
if (pHAggTask != NULL) {
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
}
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey);

View File

@ -1225,7 +1225,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamTaskRestoreStatus(pTask);
// streamTaskRestoreStatus(pTask);
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
@ -1259,6 +1259,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
#endif
// now the fill-history task starts to scan data from wal files.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
tqScanWalAsync(pTq, false);
}
@ -1271,8 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// Not update the fill-history time window until the state transfer is completed if the related fill-history task
// exists.
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64
", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask);

View File

@ -32,8 +32,8 @@ typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct SAttachedEventInfo {
ETaskStatus status;
EStreamTaskEvent event;
ETaskStatus status; // required status that this event can be handled
EStreamTaskEvent event; // the delayed handled event
} SAttachedEventInfo;
typedef struct STaskStateTrans {
@ -48,13 +48,15 @@ typedef struct STaskStateTrans {
struct SStreamTaskSM {
SStreamTask* pTask;
// SArray* pTransList; // SArray<STaskStateTrans>
STaskStateTrans* pActiveTrans;
int64_t startTs;
SStreamTaskState current;
SStreamTaskState prev;
struct {
SStreamTaskState state;
EStreamTaskEvent evt;
} prev;
// register the next handled event, if current state is not allowed to handle this event
SArray* eventList;
SArray* pWaitingEventList;
};
typedef struct SStreamEventInfo {

View File

@ -32,10 +32,11 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
};
SStreamEventInfo StreamTaskEventList[11] = {
SStreamEventInfo StreamTaskEventList[12] = {
{}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"},
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"},
{.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"},
{.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
{.event = TASK_EVENT_STOP, .name = "stopping"},
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
@ -66,7 +67,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
taosArrayPush(pTask->status.pSM->eventList, pEvtInfo);
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
return 0;
}
@ -126,12 +127,15 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev;
pSM->prev = state;
pSM->current = pSM->prev.state;
pSM->prev.state = state;
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.name, pSM->current.name);
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
@ -147,8 +151,8 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
}
pSM->pTask = pTask;
pSM->eventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
if (pSM->eventList == NULL) {
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
if (pSM->pWaitingEventList == NULL) {
taosMemoryFree(pSM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -168,7 +172,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
return NULL;
}
taosArrayDestroy(pSM->eventList);
taosArrayDestroy(pSM->pWaitingEventList);
taosMemoryFree(pSM);
return NULL;
}
@ -192,7 +196,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
taosThreadMutexUnlock(&pTask->lock);
if (s == pTrans->attachEvent.status) {
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name);
return TSDB_CODE_SUCCESS;
} else {// this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr,
@ -223,27 +228,32 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
return TSDB_CODE_SUCCESS;
}
static void keepPrevInfo(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
pSM->prev.state = pSM->current;
pSM->prev.evt = pTrans->event;
}
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
SStreamTask* pTask = pSM->pTask;
// do update the task status
taosThreadMutexLock(&pTask->lock);
SStreamTaskState current = pSM->current;
keepPrevInfo(pSM);
pSM->prev = pSM->current;
pSM->current = pTrans->next;
pSM->pActiveTrans = NULL;
// on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask);
if (taosArrayGetSize(pSM->eventList) > 0) {
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->eventList);
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList);
// OK, let's handle the attached event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) {
@ -258,7 +268,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
taosThreadMutexUnlock(&pTask->lock);
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
} else {
@ -270,7 +279,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
}
return TSDB_CODE_SUCCESS;
@ -292,7 +301,7 @@ void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->eventList);
taosArrayClear(pSM->pWaitingEventList);
}
void streamTaskSetStatusReady(SStreamTask* pTask) {
@ -302,12 +311,13 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
return;
}
pSM->prev = pSM->current;
pSM->prev.state = pSM->current;
pSM->prev.evt = 0;
pSM->current = StreamTaskStatusList[TASK_STATUS__READY];
pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->eventList);
taosArrayClear(pSM->pWaitingEventList);
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,

View File

@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
}
if (index >= pArray->size) {
uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity);
uError("index is out of range, current:%" PRIzu " max:%"PRIzu, index, pArray->size);
return NULL;
}

View File

@ -7,7 +7,7 @@ from util.common import *
class TDTestCase:
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0}
'asynclog': 0, 'stdebugflag':135}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)