fix(stream): do check when task is paused, and do some internal refactor.
This commit is contained in:
parent
34048937a4
commit
1adf59e7d8
|
@ -832,7 +832,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
|
|||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
|
||||
// common
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
void streamTaskPause(SStreamTask* pTask);
|
||||
void streamTaskResume(SStreamTask* pTask);
|
||||
int32_t streamTaskStop(SStreamTask* pTask);
|
||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||
|
|
|
@ -989,7 +989,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
|||
}
|
||||
|
||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||
streamTaskPause(pMeta, pTask);
|
||||
streamTaskPause(pTask);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -1006,7 +1006,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
|||
|
||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||
|
||||
streamTaskPause(pMeta, pHistoryTask);
|
||||
streamTaskPause(pHistoryTask);
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -1610,11 +1610,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
|||
doUnpinSttBlock(pSttBlockReader);
|
||||
if (hasVal) {
|
||||
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) {
|
||||
tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData);
|
||||
}
|
||||
|
||||
if (pkCompEx(pSttKey, pNext) != 0) {
|
||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||
*copied = (code == TSDB_CODE_SUCCESS);
|
||||
|
|
|
@ -2441,7 +2441,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
|
|||
int32_t rowLen = 0;
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||
rowLen += pc->resDataInfo.interBufSize;
|
||||
}
|
||||
|
||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||
|
|
|
@ -912,7 +912,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
void streamTaskPause(SStreamTask* pTask) {
|
||||
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||
}
|
||||
|
||||
|
@ -983,19 +983,27 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||
if (p->taskId == taskId) {
|
||||
return p;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p->taskId == taskId) {
|
||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosArrayPush(pInfo->pList, &info);
|
||||
|
@ -1008,30 +1016,29 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
|||
int32_t* pNotReady, const char* id) {
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p->taskId == taskId) {
|
||||
|
||||
if (reqId != p->reqId) {
|
||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
||||
" expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
id, reqId, p->reqId, taskId);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// subtract one not-ready-task, since it is ready now
|
||||
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
||||
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
||||
} else {
|
||||
*pNotReady = pInfo->notReadyTasks;
|
||||
}
|
||||
|
||||
p->status = status;
|
||||
p->rspTs = rspTs;
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
|
||||
if (reqId != p->reqId) {
|
||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
||||
" expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
id, reqId, p->reqId, taskId);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// subtract one not-ready-task, since it is ready now
|
||||
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
||||
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
||||
} else {
|
||||
*pNotReady = pInfo->notReadyTasks;
|
||||
}
|
||||
|
||||
p->status = status;
|
||||
p->rspTs = rspTs;
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
@ -1111,6 +1118,31 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
}
|
||||
}
|
||||
|
||||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||
(*numOfReady) += 1;
|
||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
||||
p->taskId);
|
||||
(*numOfFault) += 1;
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||
if (p->rspTs == 0) { // not response yet
|
||||
ASSERT(p->status == -1);
|
||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||
taosArrayPush(pTimeoutList, &p->taskId);
|
||||
} else { // el < CHECK_NOT_RSP_DURATION
|
||||
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
|
||||
}
|
||||
} else {
|
||||
taosArrayPush(pNotReadyList, &p->taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void rspMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||
|
@ -1140,14 +1172,13 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1166,27 +1197,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||
|
||||
if (pStat->state == TASK_STATUS__UNINIT) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||
numOfReady += 1;
|
||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
||||
p->taskId);
|
||||
numOfFault += 1;
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||
if (p->rspTs == 0) { // not response yet
|
||||
ASSERT(p->status == -1);
|
||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||
taosArrayPush(pTimeoutList, &p->taskId);
|
||||
} else { // el < CHECK_NOT_RSP_DURATION
|
||||
numOfNotRsp += 1; // do nothing and continue waiting for their rsp
|
||||
}
|
||||
} else {
|
||||
taosArrayPush(pNotReadyList, &p->taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
||||
} else { // unexpected status
|
||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
||||
}
|
||||
|
@ -1203,11 +1214,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
taosArrayDestroy(pNotReadyList);
|
||||
taosArrayDestroy(pTimeoutList);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1228,6 +1239,9 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pNotReadyList);
|
||||
taosArrayDestroy(pTimeoutList);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1238,13 +1252,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||
if (p->taskId == taskId) {
|
||||
p->rspTs = 0;
|
||||
p->status = -1;
|
||||
doSendCheckMsg(pTask, p);
|
||||
}
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
p->rspTs = 0;
|
||||
p->status = -1;
|
||||
doSendCheckMsg(pTask, p);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1258,13 +1270,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||
if (p->taskId == taskId) {
|
||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||
doSendCheckMsg(pTask, p);
|
||||
break;
|
||||
}
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||
doSendCheckMsg(pTask, p);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue