refactor: do some internal refactor.
This commit is contained in:
parent
6c86847b12
commit
3c61932ecc
|
@ -50,7 +50,6 @@ enum {
|
||||||
TASK_STATUS__RECOVER_PREPARE,
|
TASK_STATUS__RECOVER_PREPARE,
|
||||||
TASK_STATUS__RECOVER1,
|
TASK_STATUS__RECOVER1,
|
||||||
TASK_STATUS__RECOVER2,
|
TASK_STATUS__RECOVER2,
|
||||||
TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint
|
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -346,7 +345,7 @@ typedef struct SStreamMeta {
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int32_t walScan;
|
int32_t walScanCounter;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
|
@ -545,8 +544,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
// recover and fill history
|
// recover and fill history
|
||||||
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
|
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
|
||||||
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
|
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
|
||||||
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask);
|
||||||
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
|
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
|
|
|
@ -111,8 +111,13 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
|
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
|
||||||
|
|
||||||
tqInitialize(pTq);
|
int32_t code = tqInitialize(pTq);
|
||||||
return pTq;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqClose(pTq);
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
return pTq;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqInitialize(STQ* pTq) {
|
int32_t tqInitialize(STQ* pTq) {
|
||||||
|
@ -601,11 +606,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->chkInfo.currentVer = ver;
|
pTask->chkInfo.currentVer = ver;
|
||||||
|
|
||||||
// expand executor
|
// expand executor
|
||||||
if (pTask->fillHistory) {
|
pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
|
||||||
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
|
||||||
} else {
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__RESTORE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||||
|
@ -664,6 +665,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
|
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
|
||||||
|
|
||||||
|
@ -693,8 +695,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
};
|
};
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0;
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
tqDebug("tq recv task check req(reqId:0x%" PRIx64
|
tqDebug("tq recv task check req(reqId:0x%" PRIx64
|
||||||
|
@ -1147,9 +1150,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
|
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
|
|
||||||
streamProcessRunReq(pTask);
|
|
||||||
} else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
|
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
|
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
|
||||||
pTask->id.idStr, pTask->chkInfo.version);
|
pTask->id.idStr, pTask->chkInfo.version);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
|
@ -1313,10 +1313,10 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->walScan += 1;
|
pMeta->walScanCounter += 1;
|
||||||
|
|
||||||
if (pMeta->walScan > 1) {
|
if (pMeta->walScanCounter > 1) {
|
||||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan);
|
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,15 +18,14 @@
|
||||||
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
|
|
||||||
// this function should be executed by stream threads.
|
// this function should be executed by stream threads.
|
||||||
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
|
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
||||||
// will not stop eventually.
|
|
||||||
int32_t tqStreamTasksScanWal(STQ* pTq) {
|
int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t scan = pMeta->walScan;
|
int32_t scan = pMeta->walScanCounter;
|
||||||
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
|
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
|
||||||
|
|
||||||
// check all restore tasks
|
// check all restore tasks
|
||||||
|
@ -37,12 +36,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
|
|
||||||
if (shouldIdle) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
pMeta->walScan -= 1;
|
pMeta->walScanCounter -= 1;
|
||||||
times = pMeta->walScan;
|
times = pMeta->walScanCounter;
|
||||||
|
|
||||||
ASSERT(pMeta->walScan >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
|
|
||||||
if (pMeta->walScan <= 0) {
|
if (pMeta->walScanCounter <= 0) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet);
|
SEpSet* pEpSet);
|
||||||
|
|
|
@ -208,7 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
|
@ -240,7 +240,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
|
||||||
msg.pCont = buf;
|
msg.pCont = buf;
|
||||||
msg.msgType = TDMT_STREAM_TASK_CHECK;
|
msg.msgType = TDMT_STREAM_TASK_CHECK;
|
||||||
|
|
||||||
qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
|
qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
|
||||||
pReq->streamId, pReq->downstreamTaskId, nodeId);
|
pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
|
@ -28,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
|
|
||||||
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) {
|
if (status != TASK_STATUS__NORMAL) {
|
||||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
||||||
atomic_load_8(&pTask->status.taskStatus));
|
atomic_load_8(&pTask->status.taskStatus));
|
||||||
taosMsleep(2);
|
taosMsleep(2);
|
||||||
|
|
|
@ -287,6 +287,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
tDecodeStreamTask(&decoder, pTask);
|
tDecodeStreamTask(&decoder, pTask);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -305,7 +306,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/
|
|
||||||
if (pTask->fillHistory) {
|
if (pTask->fillHistory) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
||||||
streamTaskCheckDownstream(pTask, ver);
|
streamTaskCheckDownstream(pTask, ver);
|
||||||
|
|
|
@ -54,6 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
|
||||||
|
|
||||||
// checkstatus
|
// checkstatus
|
||||||
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
||||||
|
qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
|
||||||
|
|
||||||
SStreamTaskCheckReq req = {
|
SStreamTaskCheckReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
|
@ -63,6 +65,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
@ -70,7 +73,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
||||||
|
|
||||||
qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
|
qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
|
||||||
req.downstreamNodeId);
|
req.downstreamNodeId);
|
||||||
streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
|
@ -86,7 +89,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
|
qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
|
||||||
req.downstreamTaskId, req.downstreamNodeId);
|
req.downstreamTaskId, req.downstreamNodeId);
|
||||||
streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
|
qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
|
||||||
|
@ -111,14 +114,14 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
|
||||||
req.downstreamTaskId, req.downstreamNodeId);
|
req.downstreamTaskId, req.downstreamNodeId);
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t vgSz = taosArrayGetSize(vgInfo);
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
if (pVgInfo->taskId == req.downstreamTaskId) {
|
if (pVgInfo->taskId == req.downstreamTaskId) {
|
||||||
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,8 +129,8 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) {
|
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
|
||||||
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL;
|
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
|
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
|
||||||
|
@ -137,7 +140,9 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
if (pRsp->status == 1) {
|
if (pRsp->status == 1) {
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) {
|
|
||||||
|
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
|
||||||
|
for (int32_t i = 0; i < numOfReqs; i++) {
|
||||||
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
|
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
|
||||||
if (reqId == pRsp->reqId) {
|
if (reqId == pRsp->reqId) {
|
||||||
found = true;
|
found = true;
|
||||||
|
@ -151,9 +156,12 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
|
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
|
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
taosArrayDestroy(pTask->checkReqIds);
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
pTask->checkReqIds = NULL;
|
pTask->checkReqIds = NULL;
|
||||||
|
|
||||||
|
qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
|
||||||
streamTaskLaunchRecover(pTask, version);
|
streamTaskLaunchRecover(pTask, version);
|
||||||
}
|
}
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
@ -165,7 +173,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
} else { // not ready, it should wait for at least 100ms and then retry
|
} else { // not ready, wait for 100ms and retry
|
||||||
|
qDebug("s-task:%s downstream taskId:%"PRId64" (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
|
||||||
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
|
taosMsleep(100);
|
||||||
streamRecheckOneDownstream(pTask, pRsp);
|
streamRecheckOneDownstream(pTask, pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue