Merge pull request #25449 from taosdata/fix/3_liaohj
fix(stream): check req id to remove expired check rsp .
This commit is contained in:
commit
b30650059b
|
@ -832,7 +832,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
|
||||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
// common
|
// common
|
||||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
void streamTaskPause(SStreamTask* pTask);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
int32_t streamTaskStop(SStreamTask* pTask);
|
int32_t streamTaskStop(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
|
|
|
@ -989,7 +989,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||||
streamTaskPause(pMeta, pTask);
|
streamTaskPause(pTask);
|
||||||
|
|
||||||
SStreamTask* pHistoryTask = NULL;
|
SStreamTask* pHistoryTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -1006,7 +1006,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||||
|
|
||||||
streamTaskPause(pMeta, pHistoryTask);
|
streamTaskPause(pHistoryTask);
|
||||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1637,8 +1637,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
|
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
||||||
reallocVarDataVal(&lastCol.rowKey.pks[i]);
|
reallocVarDataVal(&lastCol.rowKey.pks[j]);
|
||||||
}
|
}
|
||||||
reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
taosArrayPush(pLastArray, &lastCol);
|
taosArrayPush(pLastArray, &lastCol);
|
||||||
|
@ -1667,8 +1667,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
|
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
||||||
reallocVarDataVal(&lastCol.rowKey.pks[i]);
|
reallocVarDataVal(&lastCol.rowKey.pks[j]);
|
||||||
}
|
}
|
||||||
reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
|
|
|
@ -1610,11 +1610,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
||||||
doUnpinSttBlock(pSttBlockReader);
|
doUnpinSttBlock(pSttBlockReader);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) {
|
|
||||||
tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pkCompEx(pSttKey, pNext) != 0) {
|
if (pkCompEx(pSttKey, pNext) != 0) {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
*copied = (code == TSDB_CODE_SUCCESS);
|
*copied = (code == TSDB_CODE_SUCCESS);
|
||||||
|
|
|
@ -217,6 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
SCacheRowsScanInfo* pInfo = pOperator->info;
|
SCacheRowsScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableListInfo* pTableList = pInfo->pTableList;
|
STableListInfo* pTableList = pInfo->pTableList;
|
||||||
|
SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn;
|
||||||
|
|
||||||
uint64_t suid = tableListGetSuid(pTableList);
|
uint64_t suid = tableListGetSuid(pTableList);
|
||||||
int32_t size = tableListGetSize(pTableList);
|
int32_t size = tableListGetSize(pTableList);
|
||||||
|
@ -237,8 +238,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
blockDataCleanup(pInfo->pBufferedRes);
|
blockDataCleanup(pInfo->pBufferedRes);
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes,
|
int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
|
||||||
pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
|
pInfo->pDstSlotIds, pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -307,10 +308,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pInfo->pLastrowReader) {
|
if (NULL == pInfo->pLastrowReader) {
|
||||||
code = pInfo->readHandle.api.cacheFn.openReader(
|
code = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||||
pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList),
|
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
|
||||||
pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList,
|
&pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol,
|
||||||
&pInfo->pkCol, pInfo->numOfPks);
|
pInfo->numOfPks);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pInfo->currentGroupIndex += 1;
|
pInfo->currentGroupIndex += 1;
|
||||||
|
@ -318,12 +319,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pInfo->readHandle.api.cacheFn.reuseReader(pInfo->pLastrowReader, pList, num);
|
pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
|
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
|
||||||
pInfo->pUidList);
|
pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -357,7 +358,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pLastrowReader = pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
pInfo->pLastrowReader = pReaderFn->closeReader(pInfo->pLastrowReader);
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,7 +195,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||||
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
|
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
||||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||||
|
|
||||||
|
@ -218,8 +218,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else { // for sink task, set it ready directly.
|
} else { // for sink task, set it ready directly.
|
||||||
|
@ -395,7 +396,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||||
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
||||||
|
@ -405,7 +409,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
}
|
}
|
||||||
} else { // not ready, wait for 100ms and retry
|
} else { // not ready, wait for 100ms and retry
|
||||||
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_SUCCESS; // return success in any cases.
|
||||||
|
}
|
||||||
|
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||||
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
||||||
|
|
|
@ -912,7 +912,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
void streamTaskPause(SStreamTask* pTask) {
|
||||||
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -983,20 +983,28 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
||||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
if (p != NULL) {
|
||||||
if (p->taskId == taskId) {
|
|
||||||
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->pList, &info);
|
taosArrayPush(pInfo->pList, &info);
|
||||||
|
|
||||||
|
@ -1008,12 +1016,18 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
int32_t* pNotReady, const char* id) {
|
int32_t* pNotReady, const char* id) {
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
if (p != NULL) {
|
||||||
if (p->taskId == taskId) {
|
|
||||||
ASSERT(reqId == p->reqId);
|
|
||||||
|
|
||||||
// count down one, since it is ready now
|
if (reqId != p->reqId) {
|
||||||
|
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
||||||
|
" expired check-rsp recv from downstream task:0x%x, discarded",
|
||||||
|
id, reqId, p->reqId, taskId);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// subtract one not-ready-task, since it is ready now
|
||||||
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
|
||||||
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1026,10 +1040,10 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId);
|
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
|
||||||
|
reqId);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1077,7 +1091,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
req.reqId = p->reqId;
|
req.reqId = p->reqId;
|
||||||
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64,
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||||
|
@ -1093,8 +1107,10 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d",
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
||||||
|
p->reqId);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1104,6 +1120,31 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
(*numOfReady) += 1;
|
||||||
|
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
|
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
||||||
|
p->taskId);
|
||||||
|
(*numOfFault) += 1;
|
||||||
|
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||||
|
if (p->rspTs == 0) { // not response yet
|
||||||
|
ASSERT(p->status == -1);
|
||||||
|
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||||
|
taosArrayPush(pTimeoutList, &p->taskId);
|
||||||
|
} else { // el < CHECK_NOT_RSP_DURATION
|
||||||
|
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayPush(pNotReadyList, &p->taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void rspMonitorFn(void* param, void* tmrId) {
|
static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||||
|
@ -1119,7 +1160,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
int32_t numOfNotReady = 0;
|
int32_t numOfNotReady = 0;
|
||||||
int32_t numOfTimeout = 0;
|
int32_t numOfTimeout = 0;
|
||||||
|
|
||||||
stDebug("s-task:%s start to do check downstream rsp check", id);
|
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
|
||||||
|
|
||||||
if (state == TASK_STATUS__STOP) {
|
if (state == TASK_STATUS__STOP) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
@ -1130,17 +1171,20 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1159,27 +1203,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
if (pStat->state == TASK_STATUS__UNINIT) {
|
if (pStat->state == TASK_STATUS__UNINIT) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
|
||||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
|
||||||
numOfReady += 1;
|
|
||||||
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
|
||||||
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
|
|
||||||
p->taskId);
|
|
||||||
numOfFault += 1;
|
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY
|
|
||||||
if (p->rspTs == 0) { // not response yet
|
|
||||||
ASSERT(p->status == -1);
|
|
||||||
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
|
||||||
taosArrayPush(pTimeoutList, &p->taskId);
|
|
||||||
} else { // el < CHECK_NOT_RSP_DURATION
|
|
||||||
numOfNotRsp += 1; // do nothing and continue waiting for their rsp
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArrayPush(pNotReadyList, &p->taskId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else { // unexpected status
|
} else { // unexpected status
|
||||||
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
|
||||||
}
|
}
|
||||||
|
@ -1189,18 +1213,18 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
|
||||||
// fault tasks detected, not try anymore
|
// fault tasks detected, not try anymore
|
||||||
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
|
||||||
if ((numOfNotRsp == 0) && (numOfFault > 0)) {
|
if (numOfFault > 0) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
|
streamTaskCompleteCheckRsp(pInfo, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, id);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1221,6 +1245,9 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1231,15 +1258,13 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||||
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
if (p != NULL) {
|
||||||
if (p->taskId == taskId) {
|
|
||||||
p->rspTs = 0;
|
p->rspTs = 0;
|
||||||
p->status = -1;
|
p->status = -1;
|
||||||
doSendCheckMsg(pTask, p);
|
doSendCheckMsg(pTask, p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
|
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
|
||||||
}
|
}
|
||||||
|
@ -1251,13 +1276,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
if (p != NULL) {
|
||||||
if (p->taskId == taskId) {
|
|
||||||
ASSERT(p->status == -1 && p->rspTs == 0);
|
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||||
doSendCheckMsg(pTask, p);
|
doSendCheckMsg(pTask, p);
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue