fix(stream): add the dispatch entry.
This commit is contained in:
parent
325ad5beb8
commit
9af6076b4f
|
@ -32,6 +32,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
||||||
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
||||||
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
|
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
|
||||||
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
|
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
|
||||||
|
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock);
|
||||||
|
|
||||||
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||||
pMsg->msgType = msgType;
|
pMsg->msgType = msgType;
|
||||||
|
@ -306,6 +307,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true);
|
||||||
pTask->msgInfo.pData = pReqs;
|
pTask->msgInfo.pData = pReqs;
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
|
||||||
|
@ -328,11 +330,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
// it's a new vnode to receive dispatch msg, so add one
|
// it's a new vnode to receive dispatch msg, so add one
|
||||||
if (pReqs[j].blockNum == 0) {
|
if (pReqs[j].blockNum == 0) {
|
||||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||||
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
|
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true);
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->msgInfo.lock);
|
|
||||||
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
|
|
||||||
taosThreadMutexUnlock(&pTask->msgInfo.lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pReqs[j].blockNum++;
|
pReqs[j].blockNum++;
|
||||||
|
@ -422,6 +420,20 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
|
||||||
pEntry->retryCount += 1;
|
pEntry->retryCount += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) {
|
||||||
|
SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now};
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
taosThreadMutexLock(&pMsgInfo->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pMsgInfo->pSendInfo, &entry);
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
|
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
|
||||||
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||||
|
|
||||||
|
@ -618,8 +630,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
|
|
||||||
if (pReqs[j].blockNum == 0) {
|
if (pReqs[j].blockNum == 0) {
|
||||||
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
|
||||||
SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now};
|
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
|
||||||
taosArrayPush(pTask->msgInfo.pSendInfo, &entry);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pReqs[j].blockNum++;
|
pReqs[j].blockNum++;
|
||||||
|
@ -1153,6 +1164,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
|
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
|
||||||
int32_t numOfRsp = 0;
|
int32_t numOfRsp = 0;
|
||||||
bool alreadySet = false;
|
bool alreadySet = false;
|
||||||
|
bool updated = false;
|
||||||
|
|
||||||
taosThreadMutexLock(&pMsgInfo->lock);
|
taosThreadMutexLock(&pMsgInfo->lock);
|
||||||
for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
||||||
|
@ -1162,7 +1174,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
|
||||||
pEntry->rspTs = now;
|
pEntry->rspTs = now;
|
||||||
pEntry->status = code;
|
pEntry->status = code;
|
||||||
alreadySet = true;
|
alreadySet = true;
|
||||||
stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
|
updated = true;
|
||||||
|
stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->rspTs != -1) {
|
if (pEntry->rspTs != -1) {
|
||||||
|
@ -1171,6 +1184,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
|
ASSERT(updated);
|
||||||
|
|
||||||
return numOfRsp;
|
return numOfRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1417,6 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTrySchedExec(pTask);
|
streamTrySchedExec(pTask);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue