Merge pull request #27350 from taosdata/fix/3_liaohj
refactor: remove assert in stream module.
This commit is contained in:
commit
9d433926cb
|
@ -965,6 +965,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
||||||
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
||||||
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
|
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
|
||||||
|
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -901,7 +901,10 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) {
|
||||||
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
|
tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
|
||||||
|
}
|
||||||
|
|
||||||
*hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
|
*hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,8 +97,6 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
||||||
.stage = pTask->pMeta->stage,
|
.stage = pTask->pMeta->stage,
|
||||||
};
|
};
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
// serialize streamProcessScanHistoryFinishRsp
|
// serialize streamProcessScanHistoryFinishRsp
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
streamTaskStartMonitorCheckRsp(pTask);
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
@ -187,8 +185,6 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
@ -200,6 +196,11 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->id.taskId != pRsp->upstreamTaskId) {
|
||||||
|
stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||||
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -235,7 +236,6 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, now);
|
streamMetaAddFailedTaskSelf(pTask, now);
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
|
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
|
||||||
ASSERT(left > 0);
|
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
}
|
}
|
||||||
|
@ -315,8 +315,6 @@ void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
||||||
ASSERT(pInfo->inCheckProcess == 0);
|
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pList);
|
taosArrayDestroy(pInfo->pList);
|
||||||
pInfo->pList = NULL;
|
pInfo->pList = NULL;
|
||||||
|
|
||||||
|
@ -338,7 +336,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
|
||||||
|
stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
|
||||||
|
pTask->info.fillHistory);
|
||||||
|
}
|
||||||
|
|
||||||
// halt it self for count window stream task until the related fill history task completed.
|
// halt it self for count window stream task until the related fill history task completed.
|
||||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||||
|
@ -396,7 +397,6 @@ void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutput
|
||||||
pInfo->notReadyTasks = 1;
|
pInfo->notReadyTasks = 1;
|
||||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
|
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->startTs = startTs;
|
pInfo->startTs = startTs;
|
||||||
|
@ -461,7 +461,6 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
if (pInfo->inCheckProcess == 0) {
|
if (pInfo->inCheckProcess == 0) {
|
||||||
pInfo->inCheckProcess = 1;
|
pInfo->inCheckProcess = 1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pInfo->startTs > 0);
|
|
||||||
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
|
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
|
||||||
pInfo->startTs);
|
pInfo->startTs);
|
||||||
pInfo->stopCheckProcess = 0; // disable auto stop of check process
|
pInfo->stopCheckProcess = 0; // disable auto stop of check process
|
||||||
|
@ -564,8 +563,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,7 +582,6 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
|
||||||
(*numOfFault) += 1;
|
(*numOfFault) += 1;
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||||
if (p->rspTs == 0) { // not response yet
|
if (p->rspTs == 0) { // not response yet
|
||||||
ASSERT(p->status == -1);
|
|
||||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||||
(void) taosArrayPush(pTimeoutList, &p->taskId);
|
(void) taosArrayPush(pTimeoutList, &p->taskId);
|
||||||
} else { // el < CHECK_NOT_RSP_DURATION
|
} else { // el < CHECK_NOT_RSP_DURATION
|
||||||
|
@ -610,9 +606,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
|
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
pInfo->timeoutStartTs = taosGetTimestampMs();
|
pInfo->timeoutStartTs = taosGetTimestampMs();
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
int32_t* px = taosArrayGet(pTimeoutList, i);
|
int32_t* px = taosArrayGet(pTimeoutList, i);
|
||||||
if (px == NULL) {
|
if (px == NULL) {
|
||||||
|
@ -623,7 +617,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
|
||||||
SDownstreamStatusInfo* p = NULL;
|
SDownstreamStatusInfo* p = NULL;
|
||||||
findCheckRspStatus(pInfo, taskId, &p);
|
findCheckRspStatus(pInfo, taskId, &p);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
|
||||||
|
if (p->status != -1 || p->rspTs != 0) {
|
||||||
|
stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%"PRId64, pTask->id.idStr, i, p->status,
|
||||||
|
p->rspTs);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
doSendCheckMsg(pTask, p);
|
doSendCheckMsg(pTask, p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -662,8 +662,6 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int32_t numOfNotReady = taosArrayGetSize(pNotReadyList);
|
int32_t numOfNotReady = taosArrayGetSize(pNotReadyList);
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
// reset the info, and send the check msg to failure downstream again
|
// reset the info, and send the check msg to failure downstream again
|
||||||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
|
int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
|
||||||
|
@ -696,12 +694,10 @@ int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64
|
||||||
void rspMonitorFn(void* param, void* tmrId) {
|
void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
SStreamTaskState pStat = streamTaskGetStatus(pTask);
|
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
|
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
|
||||||
ETaskStatus state = pStat.state;
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfReady = 0;
|
int32_t numOfReady = 0;
|
||||||
int32_t numOfFault = 0;
|
int32_t numOfFault = 0;
|
||||||
|
@ -712,9 +708,13 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
|
||||||
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
||||||
|
|
||||||
if (state == TASK_STATUS__STOP) {
|
streamMutexLock(&pTask->lock);
|
||||||
|
SStreamTaskState state = streamTaskGetStatus(pTask);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
if (state.state == TASK_STATUS__STOP) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
|
||||||
|
@ -728,9 +728,9 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -740,7 +740,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
streamMutexLock(&pInfo->checkInfoLock);
|
streamMutexLock(&pInfo->checkInfoLock);
|
||||||
if (pInfo->notReadyTasks == 0) {
|
if (pInfo->notReadyTasks == 0) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat.name,
|
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, state.name,
|
||||||
vgId, ref);
|
vgId, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
|
@ -752,21 +752,34 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
|
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
|
||||||
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
if (pStat.state == TASK_STATUS__UNINIT) {
|
if (state.state == TASK_STATUS__UNINIT) {
|
||||||
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
||||||
|
|
||||||
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
||||||
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
// fault tasks detected, not try anymore
|
// fault tasks detected, not try anymore
|
||||||
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total);
|
bool jumpOut = false;
|
||||||
|
if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
|
||||||
|
stError(
|
||||||
|
"s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
|
||||||
|
"stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
|
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
jumpOut = true;
|
||||||
|
}
|
||||||
|
|
||||||
if (numOfFault > 0) {
|
if (numOfFault > 0) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
jumpOut = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jumpOut) {
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -776,7 +789,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else { // unexpected status
|
} else { // unexpected status
|
||||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat.name);
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
// checking of downstream tasks has been stopped by other threads
|
// checking of downstream tasks has been stopped by other threads
|
||||||
|
@ -785,7 +798,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
|
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
|
||||||
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
|
@ -40,7 +40,10 @@ int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int6
|
||||||
pChkpoint->type = checkpointType;
|
pChkpoint->type = checkpointType;
|
||||||
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
|
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
|
||||||
pChkpoint->srcTaskId = srcTaskId;
|
pChkpoint->srcTaskId = srcTaskId;
|
||||||
ASSERT(srcTaskId != 0);
|
if (srcTaskId <= 0) {
|
||||||
|
stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
@ -189,10 +192,13 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
|
||||||
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||||
pBlock->srcTaskId = pTask->id.taskId;
|
pBlock->srcTaskId = pTask->id.taskId;
|
||||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||||
|
if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
|
||||||
|
stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false);
|
|
||||||
code = streamDispatchStreamBlock(pTask);
|
code = streamDispatchStreamBlock(pTask);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
@ -593,9 +599,16 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||||
pInfo->processedVer <= pReq->checkpointVer);
|
pInfo->processedVer <= pReq->checkpointVer);
|
||||||
|
|
||||||
|
if (!valid) {
|
||||||
|
stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
|
" processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
|
||||||
|
pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, pReq->checkpointVer);
|
||||||
|
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
// update only it is in checkpoint status, or during restore procedure.
|
// update only it is in checkpoint status, or during restore procedure.
|
||||||
if (pStatus.state == TASK_STATUS__CK || (!restored)) {
|
if (pStatus.state == TASK_STATUS__CK || (!restored)) {
|
||||||
pInfo->checkpointId = pReq->checkpointId;
|
pInfo->checkpointId = pReq->checkpointId;
|
||||||
|
@ -1102,7 +1115,10 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||||
streamMutexLock(&pInfo->lock);
|
streamMutexLock(&pInfo->lock);
|
||||||
|
|
||||||
// outputQ should be empty here
|
// outputQ should be empty here
|
||||||
ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0);
|
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
|
||||||
|
stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->dispatchTrigger = true;
|
pInfo->dispatchTrigger = true;
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
@ -1375,9 +1391,7 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
pTask->pBackend = NULL;
|
pTask->pBackend = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->pBackend == NULL);
|
|
||||||
pTask->status.requireConsensusChkptId = true;
|
pTask->status.requireConsensusChkptId = true;
|
||||||
|
|
||||||
stDebug("s-task:%s set the require consensus-checkpointId flag", id);
|
stDebug("s-task:%s set the require consensus-checkpointId flag", id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
|
||||||
for (int32_t i = 0; i < blockNum; i++) {
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i);
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||||
|
@ -52,7 +51,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0);
|
int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0);
|
||||||
ASSERT(len == fullLen);
|
|
||||||
pInput = p;
|
pInput = p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -254,7 +254,11 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre
|
||||||
int32_t type = pTask->outputInfo.type;
|
int32_t type = pTask->outputInfo.type;
|
||||||
int32_t num = streamTaskGetNumOfDownstream(pTask);
|
int32_t num = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
|
||||||
ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH);
|
if(type != TASK_OUTPUT__SHUFFLE_DISPATCH && type != TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
stError("s-task:%s invalid dispatch type:%d not dispatch data", pTask->id.idStr, type);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq));
|
SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq));
|
||||||
if (pReqs == NULL) {
|
if (pReqs == NULL) {
|
||||||
|
@ -279,7 +283,7 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else { // shuffle dispatch
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "streamMsg.h"
|
#include "streamMsg.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
#include "streamInt.h"
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
|
||||||
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
|
||||||
|
@ -229,8 +230,12 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
|
||||||
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
|
||||||
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
|
||||||
|
stError("invalid dispatch req msg");
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
||||||
int32_t* pLen = taosArrayGet(pReq->dataLen, i);
|
int32_t* pLen = taosArrayGet(pReq->dataLen, i);
|
||||||
void* data = taosArrayGetP(pReq->data, i);
|
void* data = taosArrayGetP(pReq->data, i);
|
||||||
|
@ -261,7 +266,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
|
||||||
|
|
||||||
ASSERT(pReq->blockNum > 0);
|
|
||||||
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
||||||
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
|
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
|
||||||
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
||||||
|
@ -270,7 +274,10 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
void* data;
|
void* data;
|
||||||
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
|
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
|
||||||
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
|
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
|
||||||
ASSERT(len1 == len2);
|
|
||||||
|
if (len1 != len2) {
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pReq->dataLen, &len1);
|
void* p = taosArrayPush(pReq->dataLen, &len1);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
|
|
@ -452,7 +452,9 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
|
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
|
||||||
ASSERT(pBucket->numOfToken >= 0);
|
if (pBucket->numOfToken < 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
|
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
|
||||||
if (incNum > 0) {
|
if (incNum > 0) {
|
||||||
|
|
|
@ -22,9 +22,8 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||||
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
|
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
||||||
|
pTask->info.delaySchedParam);
|
||||||
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam);
|
|
||||||
|
|
||||||
pTask->schedInfo.pDelayTimer =
|
pTask->schedInfo.pDelayTimer =
|
||||||
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
|
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
|
||||||
|
|
|
@ -441,7 +441,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
|
||||||
|
|
||||||
SBackendSnapFile2 snapFile = {0};
|
SBackendSnapFile2 snapFile = {0};
|
||||||
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
|
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
|
||||||
ASSERT(code == 0);
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pDbSnapSet, &snapFile);
|
void* p = taosArrayPush(pDbSnapSet, &snapFile);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -767,7 +769,10 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
if (!taosIsDir(path)) {
|
if (!taosIsDir(path)) {
|
||||||
code = taosMulMkDir(path);
|
code = taosMulMkDir(path);
|
||||||
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
|
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
|
||||||
ASSERT(code == 0);
|
if (code) {
|
||||||
|
stError("s-task:0x%x failed to mkdir:%s", (int32_t) snapInfo.taskId, path);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pDbSnapFile->path = path;
|
pDbSnapFile->path = path;
|
||||||
|
|
|
@ -53,10 +53,9 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name);
|
pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
|
|
||||||
pTask->execInfo.readyTs = taosGetTimestampMs();
|
pTask->execInfo.readyTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
|
int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
|
||||||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||||
pTask->id.idStr, numOfDowns, el, p.name);
|
pTask->id.idStr, numOfDowns, el, p.name);
|
||||||
|
@ -212,8 +211,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
ASSERT(hTaskId != 0);
|
|
||||||
|
|
||||||
// check stream task status in the first place.
|
// check stream task status in the first place.
|
||||||
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
|
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
|
||||||
if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT &&
|
if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT &&
|
||||||
|
|
|
@ -50,8 +50,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
|
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(pTaskList == NULL);
|
return TSDB_CODE_SUCCESS; // ignore the error and return directly
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
|
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
|
||||||
|
@ -364,7 +363,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
if(pTask->status.downstreamReady != 0) {
|
||||||
|
stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
|
||||||
|
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
// avoid initialization and destroy running concurrently.
|
// avoid initialization and destroy running concurrently.
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
|
|
|
@ -168,9 +168,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isInvalidStateTransfer(state, event)) {
|
if (isInvalidStateTransfer(state, event)) {
|
||||||
return NULL;
|
stError("invalid state transfer %d, handle event:%s", state, GET_EVT_NAME(event));
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -182,8 +180,6 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
||||||
|
|
||||||
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
|
|
||||||
|
|
||||||
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||||
if (pEvtInfo == NULL) {
|
if (pEvtInfo == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -501,8 +497,11 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
ETaskStatus s = pSM->current.state;
|
ETaskStatus s = pSM->current.state;
|
||||||
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP ||
|
|
||||||
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP &&
|
||||||
|
s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) {
|
||||||
|
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||||
|
}
|
||||||
|
|
||||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
|
||||||
|
|
|
@ -57,6 +57,9 @@ int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
|
||||||
atomic_store_8(&pInfo->isActive, 0);
|
atomic_store_8(&pInfo->isActive, 0);
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
ASSERT(ref >= 0);
|
if (ref < 0) {
|
||||||
|
stFatal("invalid task timer ref value:%d, %s", ref, pTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
return ref;
|
return ref;
|
||||||
}
|
}
|
|
@ -809,6 +809,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
|
|
Loading…
Reference in New Issue