fix(stream): fix deadlock in pause.

This commit is contained in:
Haojun Liao 2024-01-15 14:37:24 +08:00
parent e012bc4bde
commit 143e6a13af
5 changed files with 14 additions and 4 deletions

View File

@ -837,6 +837,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);

View File

@ -813,8 +813,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL; char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) { if (streamTaskReadyToRun(pTask, &p)) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer); p, pTask->chkInfo.nextProcessVer);
streamExecTask(pTask); streamExecTask(pTask);
} else { } else {
int8_t status = streamTaskSetSchedStatusInactive(pTask); int8_t status = streamTaskSetSchedStatusInactive(pTask);

View File

@ -713,7 +713,13 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
*pStatus = pState->name; *pStatus = pState->name;
} }
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); // pause & halt will still run for sink tasks.
if (streamTaskIsSinkTask(pTask)) {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
} else {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
}
} }
static void doStreamExecTaskHelper(void* param, void* tmrId) { static void doStreamExecTaskHelper(void* param, void* tmrId) {

View File

@ -154,7 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*numOfBlocks = 0; *numOfBlocks = 0;
*blockSize = 0; *blockSize = 0;
// todo remove it
// no available token in bucket for sink task, let's wait for a little bit // no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);

View File

@ -848,3 +848,7 @@ void streamTaskResume(SStreamTask* pTask) {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
} }
} }
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
return pTask->info.taskLevel == TASK_LEVEL__SINK;
}