fix(stream): add lock log.

This commit is contained in:
Haojun Liao 2023-11-01 13:19:53 +08:00
parent 0fb4cfd94f
commit 5ff89bc098
4 changed files with 14 additions and 65 deletions

View File

@ -1149,12 +1149,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 1. get the related stream task // 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s",
qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1163,68 +1161,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
#if 0
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
int64_t nextProcessedVer = 0;
while (1) {
taosThreadMutexLock(&pStreamTask->lock);
int8_t status = pStreamTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
// return; do nothing
}
if (status == TASK_STATUS__HALT) {
// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
//
// taosThreadMutexUnlock(&pStreamTask->lock);
// break;
}
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt",
pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK));
taosThreadMutexUnlock(&pStreamTask->lock);
taosMsleep(1000);
continue;
}
// upgrade to halt status
if (status == TASK_STATUS__PAUSE) {
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
} else {
qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status));
}
pStreamTask->status.keepTaskStatus = status;
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
// wal scan not start yet, reset it to be the start position
nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (nextProcessedVer == -1) {
nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1;
}
tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,
id);
taosThreadMutexUnlock(&pStreamTask->lock);
break;
}
#endif
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;

View File

@ -446,6 +446,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
tqDebug("s-task:%s lock", pTask->id.idStr);
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);

View File

@ -1004,6 +1004,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
info.msg.info = *pRpcInfo; info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pTask->pRspMsgList == NULL) { if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
} }

View File

@ -201,6 +201,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
if (pTrans->attachEvent.event != 0) { if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent); attachEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlock1", pTask->id.idStr);
while (1) { while (1) {
// wait for the task to be here // wait for the task to be here
@ -224,6 +225,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
pSM->pActiveTrans = pTrans; pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlock2", pTask->id.idStr);
int32_t code = pTrans->pAction(pTask); int32_t code = pTrans->pAction(pTask);
// todo handle error code; // todo handle error code;
@ -242,6 +244,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
while (1) { while (1) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
taosMsleep(100); taosMsleep(100);
@ -282,6 +286,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
// do update the task status // do update the task status
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
STaskStateTrans* pTrans = pSM->pActiveTrans; STaskStateTrans* pTrans = pSM->pActiveTrans;
if (pTrans == NULL) { if (pTrans == NULL) {
@ -292,6 +298,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -299,6 +306,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlocky", pTask->id.idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -328,6 +336,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
pSM->pActiveTrans = pNextTrans; pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockf", pTask->id.idStr);
int32_t code = pNextTrans->pAction(pSM->pTask); int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) { if (pNextTrans->autoInvokeEndFn) {
@ -338,6 +347,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
} }
} else { } else {
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockz", pTask->id.idStr);
int64_t el = (taosGetTimestampMs() - pSM->startTs); int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,